Skip to content

Commit

Permalink
[fix][broker]fix the wrong type and default value of maxUnackedMessag…
Browse files Browse the repository at this point in the history
…esPerSubscriptionOnBrokerBlocked
  • Loading branch information
aloyszhang committed Dec 20, 2023
1 parent 69a45a1 commit 8764b5c
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 17 deletions.
2 changes: 1 addition & 1 deletion conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ maxUnackedMessagesPerBroker=0
# Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher unacked messages
# than this percentage limit and subscription will not receive any new messages until that subscription acks back
# limit/2 messages
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=16

# Broker periodically checks if subscription is stuck and unblock if flag is enabled. (Default is disabled)
unblockStuckSubscriptionEnabled=false
Expand Down
2 changes: 1 addition & 1 deletion conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ maxUnackedMessagesPerBroker=0
# Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher unacked messages
# than this percentage limit and subscription will not receive any new messages until that subscription acks back
# limit/2 messages
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=16

# Tick time to schedule task that checks topic publish rate limiting across all topics
# Reducing to lower value can give more accuracy while throttling publish but
Expand Down
2 changes: 1 addition & 1 deletion deployment/terraform-ansible/templates/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ maxUnackedMessagesPerBroker=0
# Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher unacked messages
# than this percentage limit and subscription will not receive any new messages until that subscription acks back
# limit/2 messages
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=16

# Broker periodically checks if subscription is stuck and unblock if flag is enabled. (Default is disabled)
unblockStuckSubscriptionEnabled=false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,7 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
doc = "Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher "
+ " unacked messages than this percentage limit and subscription will not receive any new messages "
+ " until that subscription acks back `limit/2` messages")
private double maxUnackedMessagesPerSubscriptionOnBrokerBlocked = 0.16;
private int maxUnackedMessagesPerSubscriptionOnBrokerBlocked = 16;
@FieldContext(
category = CATEGORY_POLICIES,
doc = "Maximum size of Consumer metadata")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,10 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
this.topicLoadRequestSemaphore = new AtomicReference<Semaphore>(
new Semaphore(pulsar.getConfiguration().getMaxConcurrentTopicLoadRequest(), false));
if (pulsar.getConfiguration().getMaxUnackedMessagesPerBroker() > 0
&& pulsar.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked() > 0.0) {
&& pulsar.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked() > 0) {
this.maxUnackedMessages = pulsar.getConfiguration().getMaxUnackedMessagesPerBroker();
this.maxUnackedMsgsPerDispatcher = (int) ((maxUnackedMessages
* pulsar.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked()) / 100);
this.maxUnackedMsgsPerDispatcher = maxUnackedMessages
* pulsar.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked() / 100;
log.info("Enabling per-broker unack-message limit {} and dispatcher-limit {} on blocked-broker",
maxUnackedMessages, maxUnackedMsgsPerDispatcher);
// block misbehaving dispatcher by checking periodically
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ public void testBlockBrokerDispatching() {
List<Long> timestamps = new ArrayList<>();
timestamps.add(System.currentTimeMillis());
int unAckedMessages = pulsar.getConfiguration().getMaxUnackedMessagesPerBroker();
double unAckedMessagePercentage = pulsar.getConfiguration()
int unAckedMessagePercentage = pulsar.getConfiguration()
.getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked();

@Cleanup("shutdownNow")
Expand All @@ -692,10 +692,9 @@ public void testBlockBrokerDispatching() {
try {
final int waitMills = 500;
final int maxUnAckPerBroker = 200;
final double unAckMsgPercentagePerDispatcher = 10;
int maxUnAckPerDispatcher = (int) ((maxUnAckPerBroker * unAckMsgPercentagePerDispatcher) / 100); // 200 *
// 10% = 20
// messages
final int unAckMsgPercentagePerDispatcher = 10;
// 200 * 10% = 20 messages
int maxUnAckPerDispatcher = maxUnAckPerBroker * unAckMsgPercentagePerDispatcher / 100;
pulsar.getConfiguration().setMaxUnackedMessagesPerBroker(maxUnAckPerBroker);
pulsar.getConfiguration()
.setMaxUnackedMessagesPerSubscriptionOnBrokerBlocked(unAckMsgPercentagePerDispatcher);
Expand Down Expand Up @@ -903,14 +902,13 @@ public void testBrokerDispatchBlockAndSubAckBackRequiredMsgs() {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

int unAckedMessages = pulsar.getConfiguration().getMaxUnackedMessagesPerBroker();
double unAckedMessagePercentage = pulsar.getConfiguration()
int unAckedMessagePercentage = pulsar.getConfiguration()
.getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked();
try {
final int maxUnAckPerBroker = 200;
final double unAckMsgPercentagePerDispatcher = 10;
int maxUnAckPerDispatcher = (int) ((maxUnAckPerBroker * unAckMsgPercentagePerDispatcher) / 100); // 200 *
// 10% = 20
// messages
final int unAckMsgPercentagePerDispatcher = 10;
// 200 *10% = 20 messages
int maxUnAckPerDispatcher = maxUnAckPerBroker * unAckMsgPercentagePerDispatcher / 100;
pulsar.getConfiguration().setMaxUnackedMessagesPerBroker(maxUnAckPerBroker);
pulsar.getConfiguration()
.setMaxUnackedMessagesPerSubscriptionOnBrokerBlocked(unAckMsgPercentagePerDispatcher);
Expand Down

0 comments on commit 8764b5c

Please sign in to comment.