Skip to content

Commit

Permalink
avoid broker config normalize and allow -ve value
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia committed Apr 18, 2024
1 parent 893a9de commit 358bff4
Showing 1 changed file with 13 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
}

private Integer normalizeValue(Integer policyValue) {
return policyValue != null && policyValue <= 0 ? null : policyValue;
return policyValue != null && policyValue < 0 ? null : policyValue;
}

private void updateNamespaceDispatchRate(Policies namespacePolicies, String cluster) {
Expand Down Expand Up @@ -376,21 +376,18 @@ private void updateTopicPolicyByBrokerConfig() {
config.isBrokerDeleteInactiveTopicsEnabled()));

updateBrokerSubscriptionTypesEnabled();
topicPolicies.getMaxSubscriptionsPerTopic()
.updateBrokerValue(normalizeValue(config.getMaxSubscriptionsPerTopic()));
topicPolicies.getMaxProducersPerTopic().updateBrokerValue(normalizeValue(config.getMaxProducersPerTopic()));
topicPolicies.getMaxConsumerPerTopic().updateBrokerValue(normalizeValue(config.getMaxConsumersPerTopic()));
topicPolicies.getMaxConsumersPerSubscription()
.updateBrokerValue(normalizeValue(config.getMaxConsumersPerSubscription()));
topicPolicies.getMaxSubscriptionsPerTopic().updateBrokerValue(config.getMaxSubscriptionsPerTopic());
topicPolicies.getMaxProducersPerTopic().updateBrokerValue(config.getMaxProducersPerTopic());
topicPolicies.getMaxConsumerPerTopic().updateBrokerValue(config.getMaxConsumersPerTopic());
topicPolicies.getMaxConsumersPerSubscription().updateBrokerValue(config.getMaxConsumersPerSubscription());
topicPolicies.getDeduplicationEnabled().updateBrokerValue(config.isBrokerDeduplicationEnabled());
topicPolicies.getRetentionPolicies().updateBrokerValue(new RetentionPolicies(
config.getDefaultRetentionTimeInMinutes(), config.getDefaultRetentionSizeInMB()));
topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateBrokerValue(
config.getBrokerDeduplicationSnapshotIntervalSeconds());
topicPolicies.getMaxUnackedMessagesOnConsumer()
.updateBrokerValue(normalizeValue(config.getMaxUnackedMessagesPerConsumer()));
topicPolicies.getRetentionPolicies().updateBrokerValue(
new RetentionPolicies(config.getDefaultRetentionTimeInMinutes(), config.getDefaultRetentionSizeInMB()));
topicPolicies.getDeduplicationSnapshotIntervalSeconds()
.updateBrokerValue(config.getBrokerDeduplicationSnapshotIntervalSeconds());
topicPolicies.getMaxUnackedMessagesOnConsumer().updateBrokerValue(config.getMaxUnackedMessagesPerConsumer());
topicPolicies.getMaxUnackedMessagesOnSubscription()
.updateBrokerValue(normalizeValue(config.getMaxUnackedMessagesPerSubscription()));
.updateBrokerValue(config.getMaxUnackedMessagesPerSubscription());
//init backlogQuota
topicPolicies.getBackLogQuotaMap()
.get(BacklogQuota.BacklogQuotaType.destination_storage)
Expand All @@ -399,9 +396,8 @@ private void updateTopicPolicyByBrokerConfig() {
.get(BacklogQuota.BacklogQuotaType.message_age)
.updateBrokerValue(brokerService.getBacklogQuotaManager().getDefaultQuota());

topicPolicies.getTopicMaxMessageSize().updateBrokerValue(normalizeValue(config.getMaxMessageSize()));
topicPolicies.getMessageTTLInSeconds()
.updateBrokerValue(normalizeValue(config.getTtlDurationDefaultInSeconds()));
topicPolicies.getTopicMaxMessageSize().updateBrokerValue(config.getMaxMessageSize());
topicPolicies.getMessageTTLInSeconds().updateBrokerValue(config.getTtlDurationDefaultInSeconds());
topicPolicies.getPublishRate().updateBrokerValue(publishRateInBroker(config));
topicPolicies.getDelayedDeliveryEnabled().updateBrokerValue(config.isDelayedDeliveryEnabled());
topicPolicies.getDelayedDeliveryTickTimeMillis().updateBrokerValue(config.getDelayedDeliveryTickTimeMillis());
Expand Down

0 comments on commit 358bff4

Please sign in to comment.