Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Multiple consumer dispatcher stuck when unackedMessages greater than maxUnackedMessages #17483

Merged
merged 17 commits into from
Sep 10, 2022
Merged

Conversation

mattisonchao
Copy link
Member

Motivation

See error stack trace:

[Unreachable] char[1024] “2022-08-31T08:05:02,202+0000 [BookKeeperClientWorker-OrderedExecutor-0-0] ERROR org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - Unknown exception for ManagedLedgerException.
java.lang.ArrayIndexOutOfBoundsException: -22012
	at java.util.ArrayList.elementData(ArrayList.java:424) ~[?:1.8.0_322]
	at java.util.ArrayList.get(ArrayList.java:437) ~[?:1.8.0_322]
	at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.java:256)
	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:517)
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$12.readEntryComplete(ManagedCursorImpl.java:1392)
	at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.”

Relative code:

int availablePermits = consumer == null ? 0 : Math.max(consumer.getAvailablePermits(), 0);
if (consumer != null && consumer.getMaxUnackedMessages() > 0) {
availablePermits = Math.min(availablePermits,
consumer.getMaxUnackedMessages() - consumer.getUnackedMessages());
}
int maxMessagesForC = Math.min(entriesWithSameKeyCount, availablePermits);
int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, maxMessagesForC,
readType, consumerStickyKeyHashesMap.get(consumer));
if (log.isDebugEnabled()) {
log.debug("[{}] select consumer {} with messages num {}, read type is {}",
name, consumer == null ? "null" : consumer.consumerName(), messagesForC, readType);
}
if (messagesForC < entriesWithSameKeyCount) {
// We are not able to push all the messages with given key to its consumer,
// so we discard for now and mark them for later redelivery
for (int i = messagesForC; i < entriesWithSameKeyCount; i++) {
Entry entry = entriesWithSameKey.get(i);
long stickyKeyHash = getStickyKeyHash(entry);
addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
entry.release();
entriesWithSameKey.set(i, null);
}
}
if (messagesForC > 0) {

When unackedMessages is greater than maxUnackedMessages, because messagesForC is a negative number, ArrayIndexOutOfBoundsException will be thrown on line 254. Then the exception will cause the message not to be delivered.

Modifications

  • Add check to avoid negative results

Verifying this change

  • Make sure that the change passes the CI checks.

Documentation

  • doc-not-needed
    (Please explain why)

@mattisonchao mattisonchao added this to the 2.12.0 milestone Sep 6, 2022
@mattisonchao mattisonchao self-assigned this Sep 6, 2022
@mattisonchao mattisonchao changed the title [fix][broker] StickyKey Dispatcher stuck when unackedMessages greater than maxUnackedMessages [fix][broker] Sticky key dispatcher stuck when unackedMessages greater than maxUnackedMessages Sep 6, 2022
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Sep 6, 2022
@lhotari
Copy link
Member

lhotari commented Sep 6, 2022

at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.java:256)

which Pulsar version is this stack trace from?

Would it be possible to add a test that reproduces the problem using the public APIs? If this happens when there are more unknowledged messages than the defined limit, I'd assume that it's fairly easy to reproduce?

@codelipenghui
Copy link
Contributor

@mattisonchao I think the issue is introduced by #16718? it should be a blocker for 2.11.0 release.

@codelipenghui codelipenghui modified the milestones: 2.12.0, 2.11.0 Sep 6, 2022
@codelipenghui codelipenghui added release/2.10.2 release/blocker Indicate the PR or issue that should block the release until it gets resolved type/bug The PR fixed a bug or issue reported a bug and removed release/2.11.1 release/2.10.3 labels Sep 6, 2022
@mattisonchao
Copy link
Member Author

@lhotari Test added, Please take a look.

Copy link
Member

@michaeljmarshall michaeljmarshall left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The solution makes sense to me, but I'm wondering if there is another bug that we need to fix too? Thanks!

@@ -634,7 +634,9 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
// round-robin dispatch batch size for this consumer
int availablePermits = c.isWritable() ? c.getAvailablePermits() : 1;
if (c.getMaxUnackedMessages() > 0) {
availablePermits = Math.min(availablePermits, c.getMaxUnackedMessages() - c.getUnackedMessages());
// Avoid negative number
int remainUnAckedMessages = Math.max(c.getMaxUnackedMessages() - c.getUnackedMessages(), 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we understand why c.getUnackedMessages() is greater than c.getMaxUnackedMessages()? I read through the test, but I didn't see what is causing the negative availablePermits.

Based on the names, that seems like there is another bug if c.getMaxUnackedMessages() - c.getUnackedMessages() is negative. Perhaps the idea that the maxUnackedMessages is just a soft limit that we shouldn't exceed by too much?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we understand why c.getUnackedMessages() is greater than c.getMaxUnackedMessages()?

Because in the current implementation, we will add unackedMessages after sent messages. if we use batch messages, the unackedMessages is probably greater than MaxUnackedMessages.

Plus, #16670 #16718 want to add this check to limit the consumer.

Based on the names, that seems like there is another bug if c.getMaxUnackedMessages() - c.getUnackedMessages() is negative.

Sure, But I find we have two sections using it. And I fix It all in this PR.

Perhaps the idea that the maxUnackedMessages is just a soft limit that we shouldn't exceed by too much?

Yes, maybe we have another method to help more robust support it, but it looks like it is another improvement.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because in the current implementation, we will add unackedMessages after sent messages. if we use batch messages, the unackedMessages is probably greater than MaxUnackedMessages.

This was the detail I was missing, thank you for explaining. I read through the code a bit more, and it makes sense.

@mattisonchao mattisonchao changed the title [fix][broker] Sticky key dispatcher stuck when unackedMessages greater than maxUnackedMessages [fix][broker] Multiple consumer dispatcher stuck when unackedMessages greater than maxUnackedMessages Sep 9, 2022
Copy link
Contributor

@congbobo184 congbobo184 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not carry this fix in master, the test for share sub mode can passed

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@mattisonchao mattisonchao merged commit d7943a5 into apache:master Sep 10, 2022
@mattisonchao mattisonchao deleted the fix_nagative_number branch September 10, 2022 13:10
codelipenghui pushed a commit that referenced this pull request Sep 12, 2022
…s` greater than `maxUnackedMessages` (#17483)

(cherry picked from commit d7943a5)
codelipenghui pushed a commit that referenced this pull request Sep 12, 2022
…s` greater than `maxUnackedMessages` (#17483)

(cherry picked from commit d7943a5)
codelipenghui pushed a commit that referenced this pull request Sep 12, 2022
…s` greater than `maxUnackedMessages` (#17483)

(cherry picked from commit d7943a5)
@mattisonchao mattisonchao removed the release/blocker Indicate the PR or issue that should block the release until it gets resolved label Sep 13, 2022
tisonkun pushed a commit to tisonkun/pulsar that referenced this pull request Sep 14, 2022
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Sep 16, 2022
…s` greater than `maxUnackedMessages` (apache#17483)

(cherry picked from commit d7943a5)
(cherry picked from commit 011e121)
dragonls pushed a commit to dragonls/pulsar that referenced this pull request Oct 21, 2022
dragonls pushed a commit to dragonls/pulsar that referenced this pull request Oct 21, 2022
…release' (merge request !67)

branch-2.9-fix-somebug-key-share
[fix][broker] Multiple consumer dispatcher stuck when unackedMessages greater than maxUnackedMessages (apache#17483)
[fix][broker] Fix consumer does not abide by the max unacks limitation for Key_Shared subscription apache#16718
[fix][broker] Fix consumer does not abide by the max unacks limitation for Shared subscription apache#16670
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker cherry-picked/branch-2.9 Archived: 2.9 is end of life cherry-picked/branch-2.10 cherry-picked/branch-2.11 doc-not-needed Your PR changes do not impact docs release/2.9.4 release/2.10.2 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants