Skip to content

Commit

Permalink
merge: #8485
Browse files Browse the repository at this point in the history
8485: [Backport release-1.3.0] Fix flaky RaftTest r=deepthidevaki a=github-actions[bot]

# Description
Backport of #8474 to `release-1.3.0`.

relates to #8394

Co-authored-by: Deepthi Devaki Akkoorath <[email protected]>
  • Loading branch information
zeebe-bors-cloud[bot] and deepthidevaki authored Dec 28, 2021
2 parents be8c605 + b7785ef commit b39bb55
Showing 1 changed file with 29 additions and 22 deletions.
51 changes: 29 additions & 22 deletions atomix/cluster/src/test/java/io/atomix/raft/RaftTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
Expand All @@ -37,6 +33,7 @@
import io.atomix.raft.RaftServer.Role;
import io.atomix.raft.cluster.RaftMember;
import io.atomix.raft.metrics.RaftRoleMetrics;
import io.atomix.raft.partition.RaftPartitionConfig;
import io.atomix.raft.primitive.TestMember;
import io.atomix.raft.protocol.TestRaftProtocolFactory;
import io.atomix.raft.protocol.TestRaftServerProtocol;
Expand All @@ -57,6 +54,7 @@
import java.io.File;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -176,7 +174,12 @@ private RaftServer createServer(
final RaftServer.Builder defaults =
RaftServer.builder(memberId)
.withMembershipService(mock(ClusterMembershipService.class))
.withProtocol(protocol);
.withProtocol(protocol)
.withPartitionConfig(
new RaftPartitionConfig()
.setElectionTimeout(Duration.ofSeconds(1))
.setHeartbeatInterval(Duration.ofMillis(100)));

final RaftServer server = configurator.apply(defaults).build();

serverProtocols.put(memberId, protocol);
Expand Down Expand Up @@ -210,7 +213,7 @@ public void testTransferLeadership() throws Throwable {
final RaftServer follower = servers.stream().filter(RaftServer::isFollower).findFirst().get();
follower.promote().thenRun(this::resume);
await(15000, 1001);
assertTrue(follower.isLeader());
assertThat(follower.isLeader()).isTrue();
}

/** Tests demoting the leader. */
Expand All @@ -222,13 +225,13 @@ public void testDemoteLeader() throws Throwable {
servers.stream()
.filter(s -> s.cluster().getLocalMember().equals(s.cluster().getLeader()))
.findFirst()
.get();
.orElseThrow();

final RaftServer follower =
servers.stream()
.filter(s -> !s.cluster().getLocalMember().equals(s.cluster().getLeader()))
.findFirst()
.get();
.orElseThrow();

follower
.cluster()
Expand Down Expand Up @@ -338,7 +341,7 @@ private void waitUntil(final BooleanSupplier condition, int retries) {
throw new RuntimeException(e);
}

assertTrue(condition.getAsBoolean());
assertThat(condition.getAsBoolean()).isTrue();
}

@Test
Expand All @@ -363,7 +366,7 @@ public void testRoleChangeNotificationAfterInitialEntryOnLeader() throws Throwab
previousLeader.stepDown();

// then
assertTrue(transitionCompleted.await(1000, TimeUnit.SECONDS));
assertThat(transitionCompleted.await(1000, TimeUnit.SECONDS)).isTrue();
}

private Optional<RaftServer> getLeader(final List<RaftServer> servers) {
Expand All @@ -386,7 +389,7 @@ private void assertLastReadInitialEntry(
final IndexedRaftLogEntry entry = raftLogReader.next();

assertThat(entry.entry()).isInstanceOf(InitialEntry.class);
assertEquals(term, entry.term());
assertThat(entry.term()).isEqualTo(term);
transitionCompleted.countDown();
}
}
Expand Down Expand Up @@ -422,7 +425,7 @@ public void testNotifyOnFailure() throws Throwable {
@Test
public void shouldLeaderStepDownOnDisconnect() throws Throwable {
final List<RaftServer> servers = createServers(3);
final RaftServer leader = getLeader(servers).get();
final RaftServer leader = getLeader(servers).orElseThrow();
final MemberId leaderId = leader.getContext().getCluster().getLocalMember().memberId();

final CountDownLatch stepDownListener = new CountDownLatch(1);
Expand All @@ -437,15 +440,15 @@ public void shouldLeaderStepDownOnDisconnect() throws Throwable {
protocolFactory.partition(leaderId);

// then
assertTrue(stepDownListener.await(30, TimeUnit.SECONDS));
assertFalse(leader.isLeader());
assertThat(stepDownListener.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(leader.isLeader()).isFalse();
}

@Test
public void shouldReconnect() throws Throwable {
// given
final List<RaftServer> servers = createServers(3);
final RaftServer leader = getLeader(servers).get();
final RaftServer leader = getLeader(servers).orElseThrow();
final MemberId leaderId = leader.getContext().getCluster().getLocalMember().memberId();
final AtomicLong commitIndex = new AtomicLong();
leader.getContext().addCommitListener(commitIndex::set);
Expand All @@ -455,7 +458,7 @@ public void shouldReconnect() throws Throwable {

// when
final var newLeader = servers.stream().filter(RaftServer::isLeader).findFirst().orElseThrow();
assertNotEquals(newLeader, leader);
assertThat(leader).isNotEqualTo(newLeader);
final var secondCommit = appendEntry(newLeader);
protocolFactory.heal(leaderId);

Expand All @@ -467,7 +470,7 @@ public void shouldReconnect() throws Throwable {
public void shouldFailOverOnLeaderDisconnect() throws Throwable {
final List<RaftServer> servers = createServers(3);

final RaftServer leader = getLeader(servers).get();
final RaftServer leader = getLeader(servers).orElseThrow();
final MemberId leaderId = leader.getContext().getCluster().getLocalMember().memberId();

final CountDownLatch newLeaderElected = new CountDownLatch(1);
Expand All @@ -485,8 +488,8 @@ public void shouldFailOverOnLeaderDisconnect() throws Throwable {
protocolFactory.partition(leaderId);

// then
assertTrue(newLeaderElected.await(30, TimeUnit.SECONDS));
assertNotEquals(newLeaderId.get(), leaderId);
assertThat(newLeaderElected.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(leaderId).isNotEqualTo(newLeaderId.get());
}

@Test
Expand Down Expand Up @@ -553,17 +556,21 @@ private RaftServer recreateServer(final RaftServer server, final MemberId member
public void shouldTriggerHeartbeatTimeouts() throws Throwable {
final List<RaftServer> servers = createServers(3);
final List<RaftServer> followers = getFollowers(servers);
final MemberId followerId =
followers.get(0).getContext().getCluster().getLocalMember().memberId();
final RaftServer follower = followers.get(0);
final MemberId followerId = follower.getContext().getCluster().getLocalMember().memberId();

// when
final TestRaftServerProtocol followerServer = serverProtocols.get(followerId);
Mockito.clearInvocations(followerServer);
protocolFactory.partition(followerId);

// then
// With priority election enabled the lowest priority node can wait upto 3 * electionTimeout
// before triggering election.
final var timeout = follower.getContext().getElectionTimeout().multipliedBy(4).toMillis();

// should send poll requests to 2 nodes
verify(followerServer, timeout(5000).atLeast(2)).poll(any(), any());
verify(followerServer, timeout(timeout).atLeast(2)).poll(any(), any());
}

@Test
Expand Down

0 comments on commit b39bb55

Please sign in to comment.