diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 565cb116c0..c6983e3319 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -500,16 +500,25 @@ PendingRequest startSetConfiguration(SetConfigurationRequest request, List newPeers = configurationStagingState.getNewPeers(); Collection newListeners = configurationStagingState.getNewListeners(); - // set the staging state - this.stagingState = configurationStagingState; - - if (newPeers.isEmpty() && newListeners.isEmpty()) { - applyOldNewConf(); + Collection allNew = newListeners.isEmpty() + ? newPeers + : newPeers.isEmpty() + ? newListeners + : Stream.concat(newPeers.stream(), newListeners.stream()) + .collect(Collectors.toList()); + + if (allNew.isEmpty()) { + applyOldNewConf(configurationStagingState); } else { // update the LeaderState's sender list - addAndStartSenders(newPeers); - addAndStartSenders(newListeners); + Collection newAppenders = addSenders(allNew); + + // set the staging state + stagingState = configurationStagingState; + + newAppenders.forEach(LogAppender::start); } + return pending; } @@ -579,14 +588,14 @@ private void commitIndexChanged() { notifySenders(); } - private void applyOldNewConf() { + private void applyOldNewConf(ConfigurationStagingState stage) { final ServerState state = server.getState(); final RaftConfigurationImpl current = state.getRaftConf(); - final RaftConfigurationImpl oldNewConf= stagingState.generateOldNewConf(current, state.getLog().getNextIndex()); + final long nextIndex = state.getLog().getNextIndex(); + final RaftConfigurationImpl oldNewConf = stage.generateOldNewConf(current, nextIndex); // apply the (old, new) configuration to log, and use it as the current conf appendConfiguration(oldNewConf); - this.stagingState = null; notifySenders(); } @@ -607,7 +616,7 @@ void updateFollowerCommitInfos(CommitInfoCache cache, List prot @Override public AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follower, List entries, TermIndex previous, long callId) { - final boolean initializing = isCaughtUp(follower); + final boolean initializing = !isCaughtUp(follower); final RaftPeerId targetId = follower.getId(); return ServerProtoUtils.toAppendEntriesRequestProto(server.getMemberId(), targetId, currentTerm, entries, ServerImplUtils.effectiveCommitIndex(raftLog.getLastCommittedIndex(), previous, entries.size()), @@ -618,9 +627,13 @@ public AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follo * Update sender list for setConfiguration request */ private void addAndStartSenders(Collection newPeers) { - if (!newPeers.isEmpty()) { - addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, false).forEach(LogAppender::start); - } + addSenders(newPeers).forEach(LogAppender::start); + } + + private Collection addSenders(Collection newPeers) { + return !newPeers.isEmpty() + ? addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, false) + : Collections.emptyList(); } private RaftPeer getPeer(RaftPeerId id) { @@ -811,20 +824,22 @@ private void checkStaging() { } else { final long commitIndex = server.getState().getLog().getLastCommittedIndex(); // check progress for the new followers - final EnumSet reports = getLogAppenders() + final List laggingFollowers = getLogAppenders() .map(LogAppender::getFollower) .filter(follower -> !isCaughtUp(follower)) + .map(FollowerInfoImpl.class::cast) + .collect(Collectors.toList()); + final EnumSet reports = laggingFollowers.stream() .map(follower -> checkProgress(follower, commitIndex)) .collect(Collectors.toCollection(() -> EnumSet.noneOf(BootStrapProgress.class))); if (reports.contains(BootStrapProgress.NOPROGRESS)) { stagingState.fail(BootStrapProgress.NOPROGRESS); } else if (!reports.contains(BootStrapProgress.PROGRESSING)) { // all caught up! - applyOldNewConf(); - getLogAppenders() - .map(LogAppender::getFollower) + applyOldNewConf(stagingState); + this.stagingState = null; + laggingFollowers.stream() .filter(f -> server.getRaftConf().containsInConf(f.getId())) - .map(FollowerInfoImpl.class::cast) .forEach(FollowerInfoImpl::catchUp); } }