Skip to content

Commit

Permalink
feat(raft): make requestTimeout configurable
Browse files Browse the repository at this point in the history
(cherry picked from commit 20cc51e)
  • Loading branch information
deepthidevaki authored and github-actions[bot] committed Jul 1, 2021
1 parent 3f67203 commit 9bc3ab3
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package io.atomix.raft.partition;

import com.esotericsoftware.kryo.serializers.FieldSerializer.Optional;
import io.atomix.primitive.partition.PartitionGroup;
import io.atomix.primitive.partition.PartitionGroup.Type;
import io.atomix.primitive.partition.PartitionGroupConfig;
import io.atomix.raft.zeebe.EntryValidator;
import io.atomix.raft.zeebe.NoopEntryValidator;
Expand All @@ -32,6 +32,7 @@ public class RaftPartitionGroupConfig extends PartitionGroupConfig<RaftPartition
private static final Duration DEFAULT_ELECTION_TIMEOUT = Duration.ofMillis(2500);
private static final Duration DEFAULT_HEARTBEAT_INTERVAL = Duration.ofMillis(250);
private static final boolean DEFAULT_PRIORITY_ELECTION = false;
private static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(5);

private Set<String> members = new HashSet<>();
private int partitionSize;
Expand All @@ -41,6 +42,7 @@ public class RaftPartitionGroupConfig extends PartitionGroupConfig<RaftPartition
private int maxAppendsPerFollower = 2;
private int maxAppendBatchSize = 32 * 1024;
private boolean priorityElectionEnabled = DEFAULT_PRIORITY_ELECTION;
private Duration requestTimeout = DEFAULT_REQUEST_TIMEOUT;

@Optional("EntryValidator")
private EntryValidator entryValidator = new NoopEntryValidator();
Expand Down Expand Up @@ -187,7 +189,7 @@ public void setMaxAppendBatchSize(final int maxAppendBatchSize) {
}

@Override
public PartitionGroup.Type getType() {
public Type getType() {
return RaftPartitionGroup.TYPE;
}

Expand All @@ -198,4 +200,12 @@ public boolean isPriorityElectionEnabled() {
public void setPriorityElectionEnabled(final boolean enable) {
priorityElectionEnabled = enable;
}

public Duration getRequestTimeout() {
return requestTimeout;
}

public void setRequestTimeout(final Duration requestTimeout) {
this.requestTimeout = requestTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.time.Duration;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -72,6 +73,7 @@ public class RaftPartitionServer implements Managed<RaftPartitionServer>, Health
new CopyOnWriteArraySet<>();
private final Set<FailureListener> deferredFailureListeners = new CopyOnWriteArraySet<>();
private final PartitionMetadata partitionMetadata;
private final Duration requestTimeout;

private RaftServer server;
private ReceivableSnapshotStore persistedSnapshotStore;
Expand All @@ -93,6 +95,7 @@ public RaftPartitionServer(
getClass(),
LoggerContext.builder(RaftPartitionServer.class).addValue(partition.name()).build());
this.partitionMetadata = partitionMetadata;
requestTimeout = config.getRequestTimeout();
}

@Override
Expand Down Expand Up @@ -306,7 +309,10 @@ private RaftStorage createRaftStorage() {

private RaftServerCommunicator createServerProtocol() {
return new RaftServerCommunicator(
partition.name(), Serializer.using(RaftNamespaces.RAFT_PROTOCOL), clusterCommunicator);
partition.name(),
Serializer.using(RaftNamespaces.RAFT_PROTOCOL),
clusterCommunicator,
requestTimeout);
}

public CompletableFuture<Void> stepDown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,24 @@
/** Raft server protocol that uses a {@link ClusterCommunicationService}. */
public class RaftServerCommunicator implements RaftServerProtocol {

private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(5);
private final RaftMessageContext context;
private final Serializer serializer;
private final ClusterCommunicationService clusterCommunicator;
private final String partitionName;
private final RaftRequestMetrics metrics;
private final Duration requestTimeout;

public RaftServerCommunicator(
final String prefix,
final Serializer serializer,
final ClusterCommunicationService clusterCommunicator) {
final ClusterCommunicationService clusterCommunicator,
final Duration requestTimeout) {
context = new RaftMessageContext(prefix);
partitionName = prefix;
this.serializer = Preconditions.checkNotNull(serializer, "serializer cannot be null");
this.clusterCommunicator =
Preconditions.checkNotNull(clusterCommunicator, "clusterCommunicator cannot be null");
this.requestTimeout = requestTimeout;
metrics = new RaftRequestMetrics(partitionName);
}

Expand Down Expand Up @@ -217,7 +219,7 @@ private <T, U> CompletableFuture<U> sendAndReceive(
serializer::encode,
serializer::decode,
MemberId.from(memberId.id()),
REQUEST_TIMEOUT);
requestTimeout);
}

private <T extends RaftMessage> T recordReceivedMetrics(final T m) {
Expand Down

0 comments on commit 9bc3ab3

Please sign in to comment.