From fc39c38eab115bd8349d753b2b9087c7f9398e0e Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sun, 20 Oct 2024 03:31:08 -0700 Subject: [PATCH] RATIS-2173. Fix zero-copy bugs for non-gRPC cases. (#1167) --- .../org/apache/ratis/TestMultiRaftGroup.java | 11 +-- .../server/leader/LogAppenderDefault.java | 30 +++--- .../server/raftlog/memory/MemoryRaftLog.java | 23 ++--- .../java/org/apache/ratis/RaftBasicTests.java | 6 -- .../java/org/apache/ratis/RaftTestUtil.java | 7 +- .../server/impl/LeaderElectionTests.java | 91 ++++++++----------- 6 files changed, 66 insertions(+), 102 deletions(-) diff --git a/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java b/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java index 190f758589..ea3962c088 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java +++ b/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java @@ -22,25 +22,20 @@ import org.apache.ratis.examples.arithmetic.ArithmeticStateMachine; import org.apache.ratis.examples.arithmetic.TestArithmetic; import org.apache.ratis.protocol.RaftGroup; -import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.impl.GroupManagementBaseTest; import org.apache.ratis.server.impl.MiniRaftCluster; -import org.apache.ratis.util.Slf4jUtils; import org.apache.ratis.util.function.CheckedBiConsumer; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; -import org.slf4j.event.Level; import java.io.IOException; import java.util.Collection; import java.util.concurrent.atomic.AtomicInteger; +@Timeout(value = 300) public class TestMultiRaftGroup extends BaseTest { - static { - Slf4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG); - } - - public static Collection data() throws IOException { + public static Collection data() { return ParameterizedBaseTest.getMiniRaftClusters(ArithmeticStateMachine.class, 0); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java index f75a80f825..8ec6c19db1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java @@ -59,23 +59,15 @@ public Comparator getCallIdComparator() { /** Send an appendEntries RPC; retry indefinitely. */ private AppendEntriesReplyProto sendAppendEntriesWithRetries(AtomicLong requestFirstIndex) throws InterruptedException, InterruptedIOException, RaftLogIOException { - int retry = 0; - - ReferenceCountedObject request = nextAppendEntriesRequest( - CallId.getAndIncrement(), false); - while (isRunning()) { // keep retrying for IOException + for(int retry = 0; isRunning(); retry++) { + final ReferenceCountedObject request = nextAppendEntriesRequest( + CallId.getAndIncrement(), false); + if (request == null) { + LOG.trace("{} no entries to send now, wait ...", this); + return null; + } try { - if (request == null || request.get().getEntriesCount() == 0) { - if (request != null) { - request.release(); - } - request = nextAppendEntriesRequest(CallId.getAndIncrement(), false); - } - - if (request == null) { - LOG.trace("{} no entries to send now, wait ...", this); - return null; - } else if (!isRunning()) { + if (!isRunning()) { LOG.info("{} is stopped. Skip appendEntries.", this); return null; } @@ -84,17 +76,19 @@ private AppendEntriesReplyProto sendAppendEntriesWithRetries(AtomicLong requestF final AppendEntriesReplyProto reply = sendAppendEntries(proto); final long first = proto.getEntriesCount() > 0 ? proto.getEntries(0).getIndex() : RaftLog.INVALID_LOG_INDEX; requestFirstIndex.set(first); - request.release(); return reply; } catch (InterruptedIOException | RaftLogIOException e) { throw e; } catch (IOException ioe) { // TODO should have more detailed retry policy here. - if (retry++ % 10 == 0) { // to reduce the number of messages + if (retry % 10 == 0) { // to reduce the number of messages LOG.warn("{}: Failed to appendEntries (retry={})", this, retry, ioe); } handleException(ioe); + } finally { + request.release(); } + if (isRunning()) { getServer().properties().rpcSleepTime().sleep(); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java index 2aac6c1b1f..f4b6dc452e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java @@ -45,15 +45,10 @@ */ public class MemoryRaftLog extends RaftLogBase { static class EntryList { - private final List> entries = new ArrayList<>(); - - ReferenceCountedObject getRef(int i) { - return i >= 0 && i < entries.size() ? entries.get(i) : null; - } + private final List entries = new ArrayList<>(); LogEntryProto get(int i) { - final ReferenceCountedObject ref = getRef(i); - return ref != null ? ref.get() : null; + return i >= 0 && i < entries.size() ? entries.get(i) : null; } TermIndex getTermIndex(int i) { @@ -81,13 +76,10 @@ void purge(int index) { } void clear(int from, int to) { - List> subList = entries.subList(from, to); - subList.forEach(ReferenceCountedObject::release); - subList.clear(); + entries.subList(from, to).clear(); } - void add(ReferenceCountedObject entryRef) { - entryRef.retain(); + void add(LogEntryProto entryRef) { entries.add(entryRef); } } @@ -128,7 +120,8 @@ public LogEntryProto get(long index) throws RaftLogIOException { public ReferenceCountedObject retainLog(long index) { checkLogState(); try (AutoCloseableLock readLock = readLock()) { - ReferenceCountedObject ref = entries.getRef(Math.toIntExact(index)); + final LogEntryProto entry = entries.get(Math.toIntExact(index)); + final ReferenceCountedObject ref = ReferenceCountedObject.wrap(entry); ref.retain(); return ref; } @@ -205,7 +198,7 @@ protected CompletableFuture appendEntryImpl(ReferenceCountedObject> appendImpl(ReferenceCountedObject extends BaseTest implements MiniRaftCluster.Factory.Get { { - Slf4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG); - RaftServerTestUtil.setStateMachineUpdaterLogLevel(Level.DEBUG); - RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration.valueOf(5, TimeUnit.SECONDS)); } diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index be8739ad8e..7339937014 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -187,8 +187,11 @@ static boolean logEntriesContains(RaftLog log, long startIndex, long endIndex, S log.get(termIndices[idxEntries].getIndex()).getStateMachineLogEntry().getLogData().toByteArray())) { ++idxExpected; } - } catch (IOException e) { - throw new RuntimeException(e); + } catch (Exception e) { + throw new IllegalStateException("Failed logEntriesContains: startIndex=" + startIndex + + ", endIndex=" + endIndex + + ", #expectedMessages=" + expectedMessages.length + + ", log=" + log, e); } ++idxEntries; } diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java index 6a5c6387cc..bda496c165 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java @@ -21,8 +21,8 @@ import org.apache.ratis.RaftTestUtil; import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.metrics.impl.RatisMetricRegistryImpl; import org.apache.ratis.metrics.impl.DefaultTimekeeperImpl; +import org.apache.ratis.metrics.impl.RatisMetricRegistryImpl; import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftGroupId; @@ -37,19 +37,18 @@ import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.metrics.LeaderElectionMetrics; import org.apache.ratis.server.protocol.TermIndex; -import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils; -import org.apache.ratis.util.ExitUtils; +import org.apache.ratis.thirdparty.com.codahale.metrics.Timer; +import org.apache.ratis.util.CodeInjectionForTesting; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LifeCycle; import org.apache.ratis.util.Slf4jUtils; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.Timestamp; import org.apache.ratis.util.function.CheckedBiConsumer; -import org.apache.ratis.util.CodeInjectionForTesting; -import org.junit.Assert; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.slf4j.Logger; +import org.slf4j.event.Level; import java.io.IOException; import java.util.ArrayList; @@ -66,18 +65,15 @@ import static org.apache.ratis.RaftTestUtil.waitForLeader; import static org.apache.ratis.server.metrics.LeaderElectionMetrics.LAST_LEADER_ELECTION_ELAPSED_TIME; import static org.apache.ratis.server.metrics.LeaderElectionMetrics.LEADER_ELECTION_COUNT_METRIC; -import static org.apache.ratis.server.metrics.LeaderElectionMetrics.LEADER_ELECTION_TIME_TAKEN; import static org.apache.ratis.server.metrics.LeaderElectionMetrics.LEADER_ELECTION_TIMEOUT_COUNT_METRIC; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.junit.Assert.assertNotNull; +import static org.apache.ratis.server.metrics.LeaderElectionMetrics.LEADER_ELECTION_TIME_TAKEN; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import org.apache.ratis.thirdparty.com.codahale.metrics.Timer; -import org.slf4j.event.Level; - public abstract class LeaderElectionTests extends BaseTest implements MiniRaftCluster.Factory.Get { @@ -89,15 +85,16 @@ public abstract class LeaderElectionTests @Test public void testBasicLeaderElection() throws Exception { LOG.info("Running testBasicLeaderElection"); - final MiniRaftCluster cluster = newCluster(5); - cluster.start(); + runWithNewCluster(5, this::runTestBasicLeaderElection); + } + + void runTestBasicLeaderElection(MiniRaftCluster cluster) throws Exception { RaftTestUtil.waitAndKillLeader(cluster); RaftTestUtil.waitAndKillLeader(cluster); RaftTestUtil.waitAndKillLeader(cluster); testFailureCase("waitForLeader after killed a majority of servers", () -> RaftTestUtil.waitForLeader(cluster, null, false), IllegalStateException.class); - cluster.shutdown(); } static class SleepCode implements CodeInjectionForTesting.Code { @@ -124,9 +121,11 @@ public void testWaitServerReady() throws Exception { final int sleepMs = 1000 + ThreadLocalRandom.current().nextInt(1000); LOG.info("Running testWaitServerReady, sleep = {}ms", sleepMs); CodeInjectionForTesting.put(RaftServerImpl.START_COMPLETE, new SleepCode(sleepMs)); - final MiniRaftCluster cluster = newCluster(1); final Timestamp startTime = Timestamp.currentTime(); - cluster.start(); + runWithNewCluster(1, c -> runTestWaitServerReady(c, sleepMs, startTime)); + } + + void runTestWaitServerReady(MiniRaftCluster cluster, int sleepMs, Timestamp startTime) throws Exception { LOG.info("Cluster started at {}ms", startTime.elapsedTimeMs()); final RaftGroupId groupId = cluster.getGroupId(); final RaftServerImpl server = (RaftServerImpl) cluster.getServers().iterator().next().getDivision(groupId); @@ -138,16 +137,17 @@ public void testWaitServerReady() throws Exception { final long elapsedMs = startTime.elapsedTimeMs(); // allow a small difference to tolerate system timer inaccuracy Assertions.assertTrue(elapsedMs > sleepMs - 10, () -> "elapseMs = " + elapsedMs + " but sleepMs = " + sleepMs); - cluster.shutdown(); CodeInjectionForTesting.remove(RaftServerImpl.START_COMPLETE); } @Test - public void testAddServerForWaitReady() throws IOException, InterruptedException { + public void testAddServerForWaitReady() throws Exception { LOG.info("Running testAddServerForWaitReady"); // normal startup cluster with 3 server - final MiniRaftCluster cluster = newCluster(3); - cluster.start(); + runWithNewCluster(3, this::runTestAddServerForWaitReady); + } + + void runTestAddServerForWaitReady(MiniRaftCluster cluster) throws Exception { RaftTestUtil.waitForLeader(cluster); try (RaftClient client = cluster.createClient()) { for (int i = 0; i < 10; ++i) { @@ -162,7 +162,7 @@ public void testAddServerForWaitReady() throws IOException, InterruptedException RaftClientReply reply = client.admin().setConfiguration(SetConfigurationRequest.Arguments.newBuilder() .setServersInNewConf(peerChanges.newPeers) .setMode(SetConfigurationRequest.Mode.ADD).build()); - Assert.assertTrue(reply.isSuccess()); + assertTrue(reply.isSuccess()); for (RaftServer server : cluster.getServers()) { RaftServerProxy proxy = (RaftServerProxy) server; proxy.getImpls().forEach(s -> { @@ -170,24 +170,20 @@ public void testAddServerForWaitReady() throws IOException, InterruptedException }); } } - cluster.shutdown();; CodeInjectionForTesting.remove(RaftServerImpl.START_COMPLETE); } @Test public void testChangeLeader() throws Exception { - SegmentedRaftLogTestUtils.setRaftLogWorkerLogLevel(Level.TRACE); LOG.info("Running testChangeLeader"); - final MiniRaftCluster cluster = newCluster(3); - cluster.start(); + runWithNewCluster(3, this::runTestChangeLeader); + } + void runTestChangeLeader(MiniRaftCluster cluster) throws Exception { RaftPeerId leader = RaftTestUtil.waitForLeader(cluster).getId(); for(int i = 0; i < 10; i++) { leader = RaftTestUtil.changeLeader(cluster, leader, IllegalStateException::new); - ExitUtils.assertNotTerminated(); } - SegmentedRaftLogTestUtils.setRaftLogWorkerLogLevel(Level.INFO); - cluster.shutdown(); } @Test @@ -273,8 +269,6 @@ public void testTransferLeader() throws Exception { Assertions.assertEquals(newLeader.getId().toString(), reply.getReplierId()); Assertions.assertTrue(reply.isSuccess()); } - - cluster.shutdown(); } } @@ -307,8 +301,6 @@ public void testYieldLeaderToHigherPriority() throws Exception { Assertions.assertEquals(newLeader.getId().toString(), reply.getReplierId()); Assertions.assertTrue(reply.isSuccess()); } - - cluster.shutdown(); } } @@ -366,8 +358,6 @@ public void testTransferLeaderTimeout() throws Exception { RaftTestUtil.deIsolate(cluster, newLeader.getId()); } - - cluster.shutdown(); } } @@ -405,14 +395,18 @@ static void enforceLeader(MiniRaftCluster cluster, final String newLeader, Logge @Test public void testLateServerStart() throws Exception { - final int numServer = 3; LOG.info("Running testLateServerStart"); - final MiniRaftCluster cluster = newCluster(numServer); + try (final MiniRaftCluster cluster = newCluster(3)) { + runTestLateServerStart(cluster); + } + } + + void runTestLateServerStart(MiniRaftCluster cluster) throws Exception { cluster.initServers(); // start all except one servers final Iterator i = cluster.getServers().iterator(); - for(int j = 1; j < numServer; j++) { + for(int j = 1; j < cluster.getNumServers(); j++) { i.next().start(); } @@ -430,7 +424,6 @@ public void testLateServerStart() throws Exception { 10, ONE_SECOND, "getLeaderId", LOG); LOG.info(cluster.printServers()); Assertions.assertEquals(leader.getId(), lastServerLeaderId); - cluster.shutdown(); } protected void testDisconnectLeader() throws Exception { @@ -448,8 +441,6 @@ protected void testDisconnectLeader() throws Exception { } finally { RaftTestUtil.deIsolate(cluster, leader.getId()); } - - cluster.shutdown(); } } @@ -471,7 +462,6 @@ public void testAddListener() throws Exception { Assertions.assertEquals(1, listener.size()); Assertions.assertEquals(changes.newPeers[0].getId(), new ArrayList<>(listener).get(0).getId()); } - cluster.shutdown(); } } @@ -497,7 +487,6 @@ public void testAddFollowerWhenExistsListener() throws Exception { Assertions.assertEquals(1, leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER).size()); } - cluster.shutdown(); } } @@ -516,7 +505,6 @@ public void testRemoveListener() throws Exception { Assertions.assertTrue(reply.isSuccess()); Assertions.assertEquals(0, leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER).size()); } - cluster.shutdown(); } } @@ -539,7 +527,6 @@ public void testChangeFollowerToListener() throws Exception { Assertions.assertEquals(1, peer.size()); Assertions.assertEquals(listeners.get(0).getId(), new ArrayList<>(peer).get(0).getId()); } - cluster.shutdown(); } } @@ -558,15 +545,16 @@ public void testChangeListenerToFollower() throws Exception { Collection peer = leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER); Assertions.assertEquals(0, peer.size()); } - cluster.shutdown(); } } @Test - public void testLeaderElectionMetrics() throws IOException, InterruptedException { + public void testLeaderElectionMetrics() throws Exception { + runWithNewCluster(3, this::runTestLeaderElectionMetrics); + } + + void runTestLeaderElectionMetrics(MiniRaftCluster cluster) throws Exception { Timestamp timestamp = Timestamp.currentTime(); - final MiniRaftCluster cluster = newCluster(3); - cluster.start(); final RaftServer.Division leaderServer = waitForLeader(cluster); final RatisMetricRegistryImpl ratisMetricRegistry = (RatisMetricRegistryImpl) @@ -588,7 +576,6 @@ public void testLeaderElectionMetrics() throws IOException, InterruptedException Long leaderElectionLatency = (Long) ratisMetricRegistry.getGauges((s, metric) -> s.contains(LAST_LEADER_ELECTION_ELAPSED_TIME)).values().iterator().next().getValue(); assertTrue(leaderElectionLatency > 0L); - cluster.shutdown(); } @Test @@ -654,8 +641,6 @@ public void testPreVote() { reply = client.io().send(new RaftTestUtil.SimpleMessage("message")); Assertions.assertTrue(reply.isSuccess()); } - - cluster.shutdown(); } catch (Exception e) { fail(e.getMessage()); }