Skip to content

Commit

Permalink
merge: #8568
Browse files Browse the repository at this point in the history
8568: [Backport stable/1.3] Configurable threshold for preferring snapshot replication r=oleschoenburg a=github-actions[bot]

# Description
Backport of #8562 to `stable/1.3`.

relates to #7968

Co-authored-by: Ole Schönburg <[email protected]>
  • Loading branch information
zeebe-bors-cloud[bot] and lenaschoenburg authored Jan 11, 2022
2 parents 0730107 + c1bcfe7 commit 9f6c33e
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,22 @@ public Builder withPartitionDistributor(final PartitionDistributor partitionDist
return this;
}

/**
* Sets the threshold for preferring snapshot replication. The unit is <i>number of records</i>
* by which a follower may lag behind before the leader starts to prefer replicating snapshots
* instead of records.
*
* @param preferSnapshotReplicationThreshold the threshold to use
* @return this builder for chaining
*/
public Builder withPreferSnapshotReplicationThreshold(
final int preferSnapshotReplicationThreshold) {
config
.getPartitionConfig()
.setPreferSnapshotReplicationThreshold(preferSnapshotReplicationThreshold);
return this;
}

@Override
public RaftPartitionGroup build() {
return new RaftPartitionGroup(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ RaftPartitionGroup buildRaftPartitionGroup(
.withHeartbeatInterval(clusterCfg.getHeartbeatInterval())
.withRequestTimeout(experimentalCfg.getRaft().getRequestTimeout())
.withMaxQuorumResponseTimeout(experimentalCfg.getRaft().getMaxQuorumResponseTimeout())
.withMinStepDownFailureCount(experimentalCfg.getRaft().getMinStepDownFailureCount());
.withMinStepDownFailureCount(experimentalCfg.getRaft().getMinStepDownFailureCount())
.withPreferSnapshotReplicationThreshold(
experimentalCfg.getRaft().getPreferSnapshotReplicationThreshold());

final int maxMessageSize = (int) networkCfg.getMaxMessageSizeInBytes();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ public final class ExperimentalRaftCfg implements ConfigurationEntry {
private static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(5);
private static final Duration DEFAULT_MAX_QUORUM_RESPONSE_TIMEOUT = Duration.ofSeconds(0);
private static final int DEFAULT_MIN_STEP_DOWN_FAILURE_COUNT = 3;
private static final int DEFAULT_PREFER_SNAPSHOT_REPLICATION_THRESHOLD = 100;

private Duration requestTimeout = DEFAULT_REQUEST_TIMEOUT;
private Duration maxQuorumResponseTimeout = DEFAULT_MAX_QUORUM_RESPONSE_TIMEOUT;
private int minStepDownFailureCount = DEFAULT_MIN_STEP_DOWN_FAILURE_COUNT;
private int preferSnapshotReplicationThreshold = DEFAULT_PREFER_SNAPSHOT_REPLICATION_THRESHOLD;

public Duration getRequestTimeout() {
return requestTimeout;
Expand All @@ -42,4 +44,12 @@ public int getMinStepDownFailureCount() {
public void setMinStepDownFailureCount(final int minStepDownFailureCount) {
this.minStepDownFailureCount = minStepDownFailureCount;
}

public int getPreferSnapshotReplicationThreshold() {
return preferSnapshotReplicationThreshold;
}

public void setPreferSnapshotReplicationThreshold(final int preferSnapshotReplicationThreshold) {
this.preferSnapshotReplicationThreshold = preferSnapshotReplicationThreshold;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,18 @@ void shouldDisablePriorityElection() {
assertThat(config.getPartitionConfig().isPriorityElectionEnabled()).isFalse();
}

@Test
void shouldSetPreferSnapshotReplicationThreshold() {
// given
brokerCfg.getExperimental().getRaft().setPreferSnapshotReplicationThreshold(1000);

// when
final var config = buildRaftPartitionGroup();

// then
assertThat(config.getPartitionConfig().getPreferSnapshotReplicationThreshold()).isEqualTo(1000);
}

private RaftPartitionGroupConfig buildRaftPartitionGroup() {
final var partitionGroup = factory.buildRaftPartitionGroup(brokerCfg, SNAPSHOT_STORE_FACTORY);
return (RaftPartitionGroupConfig) partitionGroup.config();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,27 @@ public void shouldSetRaftMinStepDownFailureCountFromEnv() {
// then
assertThat(raft.getMinStepDownFailureCount()).isEqualTo(10);
}

@Test
public void shouldSetPreferSnapshotReplicationThresholdFromConfig() {
// when
final BrokerCfg cfg = TestConfigReader.readConfig("experimental-cfg", environment);
final var raft = cfg.getExperimental().getRaft();

// then
assertThat(raft.getPreferSnapshotReplicationThreshold()).isEqualTo(500);
}

@Test
public void shouldSetPreferSnapshotReplicationThresholdFromEnv() {
// given
environment.put("zeebe.broker.experimental.raft.preferSnapshotReplicationThreshold", "10");

// when
final BrokerCfg cfg = TestConfigReader.readConfig("experimental-cfg", environment);
final var raft = cfg.getExperimental().getRaft();

// then
assertThat(raft.getPreferSnapshotReplicationThreshold()).isEqualTo(10);
}
}
1 change: 1 addition & 0 deletions broker/src/test/resources/system/experimental-cfg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ zeebe:
requestTimeout: 10s
maxQuorumResponseTimeout: 8s
minStepDownFailureCount: 5
preferSnapshotReplicationThreshold: 500
queryApi:
enabled: true
6 changes: 6 additions & 0 deletions dist/src/main/config/broker.standalone.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,12 @@
# This setting can also be overridden using the environment variable ZEEBE_BROKER_EXPERIMENTAL_RAFT_MAXQUORUMRESPONSETIMEOUT
# maxQuorumResponseTimeout = 0ms

# Threshold used by the leader to decide between replicating a snapshot or records.
# The unit is number of records by which the follower may lag behind before the leader
# prefers replicating snapshots instead of records.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_EXPERIMENTAL_RAFT_PREFERSNAPSHOTREPLICATIONTHRESHOLD.
# preferSnapshotReplicationThreshold = 100

# Allows to configure RocksDB properties, which is used for state management.
# rocksdb:
# Specify custom column family options overwriting Zeebe's own defaults.
Expand Down
6 changes: 6 additions & 0 deletions dist/src/main/config/broker.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,12 @@
# This setting can also be overridden using the environment variable ZEEBE_BROKER_EXPERIMENTAL_RAFT_MAXQUORUMRESPONSETIMEOUT
# maxQuorumResponseTimeout = 0ms

# Threshold used by the leader to decide between replicating a snapshot or records.
# The unit is number of records by which the follower may lag behind before the leader
# prefers replicating snapshots instead of records.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_EXPERIMENTAL_RAFT_PREFERSNAPSHOTREPLICATIONTHRESHOLD.
# preferSnapshotReplicationThreshold = 100

# Allows to configure RocksDB properties, which is used for state management.
# rocksdb:
# Specify custom column family options overwriting Zeebe's own defaults.
Expand Down

0 comments on commit 9f6c33e

Please sign in to comment.