-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][client] fix negative message re-delivery twice issue #20750
Conversation
@aloyszhang Please add the following content to your PR description and select a checkbox:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice catch!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getUnAckedMessageTracker
was a public method of ConsumerImpl
and MultiTopicsConsumerImpl
before this change, if you moved it to ConsumerBase
and changed it to protected
, there will be a breaking change.
It would be better to add back the getter to ConsumerImpl
and MultiTopicsConsumerImpl
. i.e. add the following code to them
@Override
public UnAckedMessageTracker getUnAckedMessageTracker() {
return super.getUnAckedMessageTracker();
}
…nAckedMessageTracker
Codecov Report
@@ Coverage Diff @@
## master #20750 +/- ##
=============================================
+ Coverage 31.96% 73.09% +41.13%
- Complexity 11812 32141 +20329
=============================================
Files 1499 1867 +368
Lines 114819 139081 +24262
Branches 12454 15306 +2852
=============================================
+ Hits 36697 101659 +64962
+ Misses 73260 29353 -43907
- Partials 4862 8069 +3207
Flags with carried forward coverage won't be shown. Click here to find out more.
|
@codelipenghui @Technoboy- This is a bug fix and no public API is added. So it should be cherry-picked to other branches, right? |
(cherry picked from commit ecd16d6)
) (cherry picked from commit ecd16d6)
(cherry picked from commit 05ac1f9)
(cherry picked from commit ecd16d6)
Thank you @izumo27 for noticing that this fix is missing from Pulsar 3.0.6 release (comment). Unfortunately this fix was missing from branch-3.0 and wasn't delivered in Pulsar 3.0.3 as it was planned. I have cherry-picked this to branch-3.0 and this fix will be delivered in Pulsar 3.0.7 release for 3.0.x . |
Motivation
When calling the
negativeAcknowledge
method, the message is supposed to be re-delivered only one time.But, actually, it may be re-delivered twice.
There are two problems:
MultiTopicsConsumerImpl
, when callingnegativeAcknowledge
, we didn't remove the message fromUnAckedMessageTracker
, so this message will be re-delivery by bothUnAckedMessageTracker
andNegativeAcksTracker
ConsumerImpl
, when callingnegativeAcknowledge
, if the message id is a BatchMessageIdImpl, the messageId will be not removed from theUnAckedMessageTracker
, there is because the removing MessageId type(BatchMessageIdImpl) is not the MessageId type of add(MessageIdImpl).See:
UnAckedMessageTracker
UnAckedMessageTracker
Modifications
MultiTopicsConsumerImpl
, remove the message id fromUnAckedMessageTracker
when callingnegativeAcknowledge
UnAckedMessageTracker
UnAckedMessageTracker
size check forNegativeAcksTest
NegativeAcksTracker
Verifying this change
Documentation
doc-not-needed
Matching PR in forked repository
PR in forked repository: aloyszhang#17