-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix] Fix Reader can be stuck from transaction aborted messages. #22610
Conversation
hi, @tjiuming Thanks for your PR. Maybe we can change this test to cover all related transactions get lastmessage id case. Line 264 in a761b97
Change to: @Test
public void testGetLastMessageIdsWithOngoingTransactions() throws Exception {
// 1. Prepare environment
String topic = "persistent://" + NAMESPACE1 + "/testGetLastMessageIdsWithOngoingTransactions";
String subName = "my-subscription";
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.create();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscribe();
// 2. Test last max read position can be required correctly.
// 2.1 Case1: send 3 original messages. |1:0|1:1|1:2|
MessageIdImpl expectedLastMessageID = null;
for (int i = 0; i < 3; i++) {
expectedLastMessageID = (MessageIdImpl) producer.newMessage().send();
}
assertMessageId(consumer, expectedLastMessageID);
// 2.2 Case2: send 2 ongoing transactional messages and 2 original messages.
// |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5.
Transaction txn1 = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.HOURS)
.build()
.get();
Transaction txn2 = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.HOURS)
.build()
.get();
// |1:0|1:1|1:2|txn1:1:3|
producer.newMessage(txn1).send();
// |1:0|1:1|1:2|txn1:1:3|1:4|
MessageIdImpl expectedLastMessageID1 = (MessageIdImpl) producer.newMessage().send();
// |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|
producer.newMessage(txn2).send();
// 2.2.1 Last message ID will not change when txn1 and txn2 do not end.
assertMessageId(consumer, expectedLastMessageID);
// 2.2.2 Last message ID will update to 1:4 when txn1 committed.
// |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|tx1:commit->1:6|
txn1.commit().get(5, TimeUnit.SECONDS);
assertMessageId(consumer, expectedLastMessageID1);
// 2.2.3 Last message ID will still to 1:4 when txn2 aborted.
// |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|tx1:commit->1:6|tx2:abort->1:7|
txn2.abort().get(5, TimeUnit.SECONDS);
assertMessageId(consumer, expectedLastMessageID1);
} |
@shibd Hi baodi, I've addressed your comment, thanks! |
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we don't need maxReadPosition.compareTo((PositionImpl) getLastPosition()) != 0
judgment condition, since maxReadPosition
also may not be a valid position for transaction topic event if maxReadPosition < lastPosition
.
I don't understand, why? |
@dao-jun See: Lines 447 to 459 in 264722f
|
@coderzc Oh, I understand you, if the maxReadPosition is also an aborted message, the reader can be stuck too. It makes sense, I'll improve test to cover the case. |
@coderzc PTAL |
@codelipenghui @shibd @coderzc It looks like Shared/key_Shared Consumer also could be stuck from delayed delivery messages, WDYT? Do we need to handle it? |
I prefer case by case fix it and adding enough unit tests to cover every case. |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #22610 +/- ##
============================================
- Coverage 73.57% 72.71% -0.86%
+ Complexity 32624 32532 -92
============================================
Files 1877 1887 +10
Lines 139502 141004 +1502
Branches 15299 15477 +178
============================================
- Hits 102638 102534 -104
- Misses 28908 30603 +1695
+ Partials 7956 7867 -89
Flags with carried forward coverage won't be shown. Click here to find out more.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding #22572 (comment),
to further reduce code duplication, start internalAsyncReverseFindPositionOneByOne
method with this part
if (!ledger.isValidPosition(previousPosition)) {
future.complete(previousPosition);
return;
}
and then this logic can be removed from asyncGetLastValidPosition
and the readEntryComplete
callback.
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
Outdated
Show resolved
Hide resolved
addressed |
@lhotari PTAL |
) (cherry picked from commit 7e88463)
…che#22610) (cherry picked from commit 7e88463) (cherry picked from commit f516a85)
…che#22610) (cherry picked from commit 7e88463) (cherry picked from commit f516a85)
Motivation
Fix Reader can be stuck from transaction aborted messages.
Related to #22572
Since Reader's SubscriptionType is
Exclusive
, so we no need to handle DelayedDelivery messages.Modifications
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: