Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] Fix Reader can be stuck from transaction aborted messages. #22610

Merged
merged 9 commits into from
May 7, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,18 @@ public static CompletableFuture<Position> asyncGetLastValidPosition(final Manage
final Predicate<Entry> predicate,
final PositionImpl startPosition) {
CompletableFuture<Position> future = new CompletableFuture<>();
if (!ledger.isValidPosition(startPosition)) {
future.complete(startPosition);
} else {
internalAsyncReverseFindPositionOneByOne(ledger, predicate, startPosition, future);
}
internalAsyncReverseFindPositionOneByOne(ledger, predicate, startPosition, future);
return future;
}

private static void internalAsyncReverseFindPositionOneByOne(final ManagedLedgerImpl ledger,
final Predicate<Entry> predicate,
final PositionImpl position,
final CompletableFuture<Position> future) {
if (!ledger.isValidPosition(position)) {
future.complete(position);
return;
}
ledger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
Expand All @@ -60,12 +60,7 @@ public void readEntryComplete(Entry entry, Object ctx) {
return;
}
PositionImpl previousPosition = ledger.getPreviousPosition((PositionImpl) position);
if (!ledger.isValidPosition(previousPosition)) {
future.complete(previousPosition);
} else {
internalAsyncReverseFindPositionOneByOne(ledger, predicate,
ledger.getPreviousPosition((PositionImpl) position), future);
}
internalAsyncReverseFindPositionOneByOne(ledger, predicate, previousPosition, future);
} catch (Exception e) {
future.completeExceptionally(e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3763,18 +3763,18 @@ public Position getLastPosition() {

@Override
public CompletableFuture<Position> getLastDispatchablePosition() {
PositionImpl maxReadPosition = getMaxReadPosition();
// If `maxReadPosition` is not equal to `LastPosition`. It means that there are uncommitted transactions.
// so return `maxRedPosition` directly.
if (maxReadPosition.compareTo((PositionImpl) getLastPosition()) != 0) {
return CompletableFuture.completedFuture(maxReadPosition);
} else {
return ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> {
MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer());
// If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer
return !Markers.isServerOnlyMarker(md);
}, maxReadPosition);
}
return ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> {
MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer());
// If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer
if (Markers.isServerOnlyMarker(md)) {
return false;
} else if (md.hasTxnidMostBits() && md.hasTxnidLeastBits()) {
// Filter-out transaction aborted messages.
TxnID txnID = new TxnID(md.getTxnidMostBits(), md.getTxnidLeastBits());
return !isTxnAborted(txnID, (PositionImpl) entry.getPosition());
}
return true;
}, getMaxReadPosition());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1978,4 +1978,73 @@ public void testDelayedDeliveryExceedsMaxDelay() throws Exception {
+ maxDeliveryDelayInMillis + " milliseconds");
}
}

@Test
public void testPersistentTopicGetLastDispatchablePositionWithTxn() throws Exception {
String topic = "persistent://" + NAMESPACE1 + "/testPersistentTopicGetLastDispatchablePositionWithTxn";

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(false)
.create();

BrokerService brokerService = pulsarTestContexts.get(0).getBrokerService();
PersistentTopic persistentTopic = (PersistentTopic) brokerService.getTopicReference(topic).get();


// send a normal message
String body = UUID.randomUUID().toString();
MessageIdImpl msgId = (MessageIdImpl) producer.send(body);

// send 3 txn messages
Transaction txn = pulsarClient.newTransaction().build().get();
producer.newMessage(txn).value(UUID.randomUUID().toString()).send();
producer.newMessage(txn).value(UUID.randomUUID().toString()).send();
producer.newMessage(txn).value(UUID.randomUUID().toString()).send();

// get last dispatchable position
PositionImpl lastDispatchablePosition = (PositionImpl) persistentTopic.getLastDispatchablePosition().get();
// the last dispatchable position should be the message id of the normal message
assertEquals(lastDispatchablePosition, PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId()));

// abort the txn
txn.abort().get(5, TimeUnit.SECONDS);

// get last dispatchable position
lastDispatchablePosition = (PositionImpl) persistentTopic.getLastDispatchablePosition().get();
// the last dispatchable position should be the message id of the normal message
assertEquals(lastDispatchablePosition, PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId()));


@Cleanup
Reader<String> reader = pulsarClient.newReader(Schema.STRING)
.topic(topic)
.startMessageId(MessageId.earliest)
.create();
Transaction txn1 = pulsarClient.newTransaction().build().get();
producer.newMessage(txn1).value(UUID.randomUUID().toString()).send();
producer.newMessage(txn1).value(UUID.randomUUID().toString()).send();
producer.newMessage(txn1).value(UUID.randomUUID().toString()).send();
List<Message<String>> messages = new ArrayList<>();
while (reader.hasMessageAvailable()) {
messages.add(reader.readNext());
}
assertEquals(messages.size(), 1);
assertEquals(messages.get(0).getValue(), body);

txn1.abort().get(5, TimeUnit.SECONDS);

@Cleanup
Reader<String> reader1 = pulsarClient.newReader(Schema.STRING)
.topic(topic)
.startMessageId(MessageId.earliest)
.create();
List<Message<String>> messages1 = new ArrayList<>();
while (reader1.hasMessageAvailable()) {
messages1.add(reader1.readNext());
}
assertEquals(messages1.size(), 1);
assertEquals(messages1.get(0).getValue(), body);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,9 @@ public void testGetLastMessageIdsWithOngoingTransactions() throws Exception {
for (int i = 0; i < 3; i++) {
expectedLastMessageID = (MessageIdImpl) producer.newMessage().send();
}
assertMessageId(consumer, expectedLastMessageID);
assertGetLastMessageId(consumer, expectedLastMessageID);
// 2.2 Case2: send 2 ongoing transactional messages and 2 original messages.
// |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|.
// |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5.
Transaction txn1 = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.HOURS)
.build()
Expand All @@ -291,25 +291,37 @@ public void testGetLastMessageIdsWithOngoingTransactions() throws Exception {
.withTransactionTimeout(5, TimeUnit.HOURS)
.build()
.get();

// |1:0|1:1|1:2|txn1:1:3|
producer.newMessage(txn1).send();
// expectedLastMessageID1 == 1:4

// |1:0|1:1|1:2|txn1:1:3|1:4|
MessageIdImpl expectedLastMessageID1 = (MessageIdImpl) producer.newMessage().send();

// |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|
producer.newMessage(txn2).send();
// expectedLastMessageID2 == 1:6
MessageIdImpl expectedLastMessageID2 = (MessageIdImpl) producer.newMessage().send();

// 2.2.1 Last message ID will not change when txn1 and txn2 do not end.
assertMessageId(consumer, expectedLastMessageID);
assertGetLastMessageId(consumer, expectedLastMessageID);

// 2.2.2 Last message ID will update to 1:4 when txn1 committed.
// |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|tx1:commit->1:7|
// |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|tx1:commit->1:6|
txn1.commit().get(5, TimeUnit.SECONDS);
assertMessageId(consumer, expectedLastMessageID1);
assertGetLastMessageId(consumer, expectedLastMessageID1);

// 2.2.3 Last message ID will update to 1:6 when txn2 aborted.
// |1:0|1:1|1:2|txn1:start->1:3|1:4|txn2:start->1:5|1:6|tx1:commit->1:7|tx2:abort->1:8|
// 2.2.3 Last message ID will still to 1:4 when txn2 aborted.
// |1:0|1:1|1:2|txn1:1:3|1:4|txn2:1:5|tx1:commit->1:6|tx2:abort->1:7|
txn2.abort().get(5, TimeUnit.SECONDS);
assertMessageId(consumer, expectedLastMessageID2);
assertGetLastMessageId(consumer, expectedLastMessageID1);

// Handle the case of the maxReadPosition < lastPosition, but it's an aborted transactional message.
Transaction txn3 = pulsarClient.newTransaction()
.build()
.get();
producer.newMessage(txn3).send();
assertGetLastMessageId(consumer, expectedLastMessageID1);
txn3.abort().get(5, TimeUnit.SECONDS);
assertGetLastMessageId(consumer, expectedLastMessageID1);
}

/**
Expand Down Expand Up @@ -368,7 +380,7 @@ private void triggerLedgerSwitch(String topicName) throws Exception{
});
}

private void assertMessageId(Consumer<?> consumer, MessageIdImpl expected) throws Exception {
private void assertGetLastMessageId(Consumer<?> consumer, MessageIdImpl expected) throws Exception {
TopicMessageIdImpl actual = (TopicMessageIdImpl) consumer.getLastMessageIds().get(0);
assertEquals(expected.getEntryId(), actual.getEntryId());
assertEquals(expected.getLedgerId(), actual.getLedgerId());
Expand Down
Loading