Skip to content

Commit

Permalink
[fix][broker] Fix broken topic policy implementation compatibility wi…
Browse files Browse the repository at this point in the history
…th old pulsar version (apache#22535)

(cherry picked from commit 59daac6)
  • Loading branch information
rdhabalia authored and lhotari committed Apr 22, 2024
1 parent beb147c commit 8439082
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,13 +216,16 @@ protected void updateTopicPolicy(TopicPolicies data) {
.updateTopicValue(formatSchemaCompatibilityStrategy(data.getSchemaCompatibilityStrategy()));
}
topicPolicies.getRetentionPolicies().updateTopicValue(data.getRetentionPolicies());
topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue(data.getMaxSubscriptionsPerTopic());
topicPolicies.getMaxUnackedMessagesOnConsumer().updateTopicValue(data.getMaxUnackedMessagesOnConsumer());
topicPolicies.getMaxSubscriptionsPerTopic()
.updateTopicValue(normalizeValue(data.getMaxSubscriptionsPerTopic()));
topicPolicies.getMaxUnackedMessagesOnConsumer()
.updateTopicValue(normalizeValue(data.getMaxUnackedMessagesOnConsumer()));
topicPolicies.getMaxUnackedMessagesOnSubscription()
.updateTopicValue(data.getMaxUnackedMessagesOnSubscription());
topicPolicies.getMaxProducersPerTopic().updateTopicValue(data.getMaxProducerPerTopic());
topicPolicies.getMaxConsumerPerTopic().updateTopicValue(data.getMaxConsumerPerTopic());
topicPolicies.getMaxConsumersPerSubscription().updateTopicValue(data.getMaxConsumersPerSubscription());
.updateTopicValue(normalizeValue(data.getMaxUnackedMessagesOnSubscription()));
topicPolicies.getMaxProducersPerTopic().updateTopicValue(normalizeValue(data.getMaxProducerPerTopic()));
topicPolicies.getMaxConsumerPerTopic().updateTopicValue(normalizeValue(data.getMaxConsumerPerTopic()));
topicPolicies.getMaxConsumersPerSubscription()
.updateTopicValue(normalizeValue(data.getMaxConsumersPerSubscription()));
topicPolicies.getInactiveTopicPolicies().updateTopicValue(data.getInactiveTopicPolicies());
topicPolicies.getDeduplicationEnabled().updateTopicValue(data.getDeduplicationEnabled());
topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateTopicValue(
Expand All @@ -233,8 +236,8 @@ protected void updateTopicPolicy(TopicPolicies data) {
Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(type ->
this.topicPolicies.getBackLogQuotaMap().get(type).updateTopicValue(
data.getBackLogQuotaMap() == null ? null : data.getBackLogQuotaMap().get(type.toString())));
topicPolicies.getTopicMaxMessageSize().updateTopicValue(data.getMaxMessageSize());
topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds());
topicPolicies.getTopicMaxMessageSize().updateTopicValue(normalizeValue(data.getMaxMessageSize()));
topicPolicies.getMessageTTLInSeconds().updateTopicValue(normalizeValue(data.getMessageTTLInSeconds()));
topicPolicies.getPublishRate().updateTopicValue(PublishRate.normalize(data.getPublishRate()));
topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
topicPolicies.getReplicatorDispatchRate().updateTopicValue(
Expand All @@ -261,15 +264,19 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
topicPolicies.getReplicationClusters().updateNamespaceValue(
new ArrayList<>(CollectionUtils.emptyIfNull(namespacePolicies.replication_clusters)));
topicPolicies.getMaxUnackedMessagesOnConsumer()
.updateNamespaceValue(namespacePolicies.max_unacked_messages_per_consumer);
.updateNamespaceValue(normalizeValue(namespacePolicies.max_unacked_messages_per_consumer));
topicPolicies.getMaxUnackedMessagesOnSubscription()
.updateNamespaceValue(namespacePolicies.max_unacked_messages_per_subscription);
topicPolicies.getMessageTTLInSeconds().updateNamespaceValue(namespacePolicies.message_ttl_in_seconds);
topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(namespacePolicies.max_subscriptions_per_topic);
topicPolicies.getMaxProducersPerTopic().updateNamespaceValue(namespacePolicies.max_producers_per_topic);
topicPolicies.getMaxConsumerPerTopic().updateNamespaceValue(namespacePolicies.max_consumers_per_topic);
.updateNamespaceValue(normalizeValue(namespacePolicies.max_unacked_messages_per_subscription));
topicPolicies.getMessageTTLInSeconds()
.updateNamespaceValue(normalizeValue(namespacePolicies.message_ttl_in_seconds));
topicPolicies.getMaxSubscriptionsPerTopic()
.updateNamespaceValue(normalizeValue(namespacePolicies.max_subscriptions_per_topic));
topicPolicies.getMaxProducersPerTopic()
.updateNamespaceValue(normalizeValue(namespacePolicies.max_producers_per_topic));
topicPolicies.getMaxConsumerPerTopic()
.updateNamespaceValue(normalizeValue(namespacePolicies.max_consumers_per_topic));
topicPolicies.getMaxConsumersPerSubscription()
.updateNamespaceValue(namespacePolicies.max_consumers_per_subscription);
.updateNamespaceValue(normalizeValue(namespacePolicies.max_consumers_per_subscription));
topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(namespacePolicies.inactive_topic_policies);
topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled);
topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateNamespaceValue(
Expand Down Expand Up @@ -299,6 +306,10 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
updateEntryFilters();
}

private Integer normalizeValue(Integer policyValue) {
return policyValue != null && policyValue < 0 ? null : policyValue;
}

private void updateNamespaceDispatchRate(Policies namespacePolicies, String cluster) {
DispatchRateImpl dispatchRate = namespacePolicies.topicDispatchRate.get(cluster);
if (dispatchRate == null) {
Expand Down Expand Up @@ -357,12 +368,11 @@ private void updateTopicPolicyByBrokerConfig() {
topicPolicies.getMaxConsumerPerTopic().updateBrokerValue(config.getMaxConsumersPerTopic());
topicPolicies.getMaxConsumersPerSubscription().updateBrokerValue(config.getMaxConsumersPerSubscription());
topicPolicies.getDeduplicationEnabled().updateBrokerValue(config.isBrokerDeduplicationEnabled());
topicPolicies.getRetentionPolicies().updateBrokerValue(new RetentionPolicies(
config.getDefaultRetentionTimeInMinutes(), config.getDefaultRetentionSizeInMB()));
topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateBrokerValue(
config.getBrokerDeduplicationSnapshotIntervalSeconds());
topicPolicies.getMaxUnackedMessagesOnConsumer()
.updateBrokerValue(config.getMaxUnackedMessagesPerConsumer());
topicPolicies.getRetentionPolicies().updateBrokerValue(
new RetentionPolicies(config.getDefaultRetentionTimeInMinutes(), config.getDefaultRetentionSizeInMB()));
topicPolicies.getDeduplicationSnapshotIntervalSeconds()
.updateBrokerValue(config.getBrokerDeduplicationSnapshotIntervalSeconds());
topicPolicies.getMaxUnackedMessagesOnConsumer().updateBrokerValue(config.getMaxUnackedMessagesPerConsumer());
topicPolicies.getMaxUnackedMessagesOnSubscription()
.updateBrokerValue(config.getMaxUnackedMessagesPerSubscription());
//init backlogQuota
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,21 @@
*/
package org.apache.pulsar.broker.service;

import static org.testng.Assert.assertEquals;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.Policies;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import lombok.Cleanup;
import java.util.Optional;
import java.util.Properties;

Expand Down Expand Up @@ -112,4 +116,40 @@ public void testNamespaceServicePulsarClientConfiguration() {
Assert.assertEquals(clientConf.getMemoryLimitBytes(), 100000);
}

@Test
public void testOldNamespacePolicy() throws Exception {

String ns = "prop/oldNsWithDefaultNonNullValues";
String topic = "persistent://" + ns + "/t1";
Policies policies = new Policies();
policies.max_consumers_per_subscription = -1;
policies.max_consumers_per_topic = -1;
policies.max_producers_per_topic = -1;
policies.max_subscriptions_per_topic = -1;
policies.max_topics_per_namespace = -1;
policies.max_unacked_messages_per_consumer = -1;
policies.max_unacked_messages_per_subscription = -1;
admin.namespaces().createNamespace(ns, policies);

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic).create();
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
assertEquals(topicRef.topicPolicies.getMaxUnackedMessagesOnSubscription().get(),
conf.getMaxUnackedMessagesPerSubscription());
assertEquals(topicRef.topicPolicies.getMaxConsumersPerSubscription().get(),
conf.getMaxConsumersPerSubscription());
assertEquals(topicRef.topicPolicies.getMaxConsumerPerTopic().get(),
conf.getMaxConsumersPerTopic());
assertEquals(topicRef.topicPolicies.getMaxProducersPerTopic().get(),
conf.getMaxProducersPerTopic());
assertEquals(topicRef.topicPolicies.getMaxSubscriptionsPerTopic().get(),
conf.getMaxSubscriptionsPerTopic());
assertEquals(topicRef.topicPolicies.getTopicMaxMessageSize().get(),
conf.getMaxMessageSize());
assertEquals(topicRef.topicPolicies.getMaxUnackedMessagesOnConsumer().get(),
conf.getMaxUnackedMessagesPerConsumer());


}
}

0 comments on commit 8439082

Please sign in to comment.