diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index 1a159974700b7..c74fb3e921775 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -21,7 +21,6 @@ import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet; import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.compareToWithAckSet; import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetOverlap; -import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -528,9 +527,10 @@ public CompletableFuture internalAbortTxn(TxnID txnId, Consumer consumer, if (cumulativeAckOfTransaction.getKey().equals(txnId)) { cumulativeAckOfTransaction = null; } - //TODO: pendingAck handle next pr will fix - persistentSubscription.redeliverUnacknowledgedMessages(consumer, DEFAULT_CONSUMER_EPOCH); abortFuture.complete(null); + + // in cumulative ack with transaction, don't depend on server redeliver message, + // it will cause the messages to be out of order }).exceptionally(e -> { log.error("[{}] Transaction pending ack store abort txnId : [{}] fail!", topicName, txnId, e); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java index 3a40982fed48d..b372f0b61c5f7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java @@ -35,6 +35,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import io.netty.channel.ChannelHandlerContext; @@ -1066,6 +1067,74 @@ public void testTxnTimeOutInClient() throws Exception{ } } + @Test + public void testCumulativeAckRedeliverMessages() throws Exception { + String topic = NAMESPACE1 + "/testCumulativeAckRedeliverMessages"; + + int count = 5; + int transactionCumulativeAck = 3; + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("test") + .subscribe(); + + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic(topic) + .sendTimeout(0, TimeUnit.SECONDS) + .create(); + + // send 5 messages + for (int i = 0; i < count; i++) { + producer.send((i + "").getBytes(UTF_8)); + } + + Transaction transaction = getTxn(); + Transaction invalidTransaction = getTxn(); + + Message message = null; + for (int i = 0; i < transactionCumulativeAck; i++) { + message = consumer.receive(); + } + + // receive transaction in order + assertEquals(message.getValue(), (transactionCumulativeAck - 1 + "").getBytes(UTF_8)); + + // ack the last message + consumer.acknowledgeCumulativeAsync(message.getMessageId(), transaction).get(); + + // another ack will throw TransactionConflictException + try { + consumer.acknowledgeCumulativeAsync(message.getMessageId(), invalidTransaction).get(); + fail(); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException); + // abort transaction then redeliver messages + transaction.abort().get(); + // consumer redeliver messages + consumer.redeliverUnacknowledgedMessages(); + } + + // receive the rest of the message + for (int i = 0; i < count; i++) { + message = consumer.receive(); + } + + Transaction commitTransaction = getTxn(); + + // receive the first message + assertEquals(message.getValue(), (count - 1 + "").getBytes(UTF_8)); + // ack the end of the message + consumer.acknowledgeCumulativeAsync(message.getMessageId(), commitTransaction).get(); + + commitTransaction.commit().get(); + + // then redeliver will not receive any message + message = consumer.receive(3, TimeUnit.SECONDS); + assertNull(message); + } + @Test public void testSendTxnMessageTimeout() throws Exception { String topic = NAMESPACE1 + "/testSendTxnMessageTimeout"; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index aeb1b9ed7db4e..4a60dad8bfc15 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -626,14 +626,6 @@ protected CompletableFuture doAcknowledgeWithTxn(List messageId TransactionImpl txn) { CompletableFuture ackFuture; if (txn != null && this instanceof ConsumerImpl) { - - // it is okay that we register acked topic after sending the acknowledgements. because - // the transactional ack will not be visiable for consumers until the transaction is - // committed - if (ackType == AckType.Cumulative) { - txn.registerCumulativeAckConsumer((ConsumerImpl) this); - } - ackFuture = txn.registerAckedTopic(getTopic(), subscription) .thenCompose(ignored -> doAcknowledge(messageIdList, ackType, properties, txn)); // register the ackFuture as part of the transaction @@ -649,13 +641,6 @@ protected CompletableFuture doAcknowledgeWithTxn(MessageId messageId, AckT TransactionImpl txn) { CompletableFuture ackFuture; if (txn != null && (this instanceof ConsumerImpl)) { - // it is okay that we register acked topic after sending the acknowledgements. because - // the transactional ack will not be visiable for consumers until the transaction is - // committed - if (ackType == AckType.Cumulative) { - txn.registerCumulativeAckConsumer((ConsumerImpl) this); - } - ackFuture = txn.registerAckedTopic(getTopic(), subscription) .thenCompose(ignored -> doAcknowledge(messageId, ackType, properties, txn)); // register the ackFuture as part of the transaction diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java index aa7a18047ded8..55b20438693e3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java @@ -22,7 +22,6 @@ import io.netty.util.Timeout; import io.netty.util.TimerTask; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -37,7 +36,6 @@ import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.InvalidTxnStatusException; import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionNotFoundException; import org.apache.pulsar.client.api.transaction.TxnID; -import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.util.FutureUtil; @@ -62,7 +60,6 @@ public class TransactionImpl implements Transaction , TimerTask { private final Map> registerPartitionMap; private final Map, CompletableFuture> registerSubscriptionMap; private final TransactionCoordinatorClientImpl tcClient; - private Map, Integer> cumulativeAckConsumers; private final ArrayList> sendFutureList; private final ArrayList> ackFutureList; @@ -122,9 +119,8 @@ public CompletableFuture registerProducedTopic(String topic) { } }); } - } else { - return completableFuture; } + return completableFuture; } public synchronized void registerSendOp(CompletableFuture sendFuture) { @@ -147,22 +143,14 @@ public CompletableFuture registerAckedTopic(String topic, String subscript } }); } - } else { - return completableFuture; } + return completableFuture; } public synchronized void registerAckOp(CompletableFuture ackFuture) { ackFutureList.add(ackFuture); } - public synchronized void registerCumulativeAckConsumer(ConsumerImpl consumer) { - if (this.cumulativeAckConsumers == null) { - this.cumulativeAckConsumers = new HashMap<>(); - } - cumulativeAckConsumers.put(consumer, 0); - } - @Override public CompletableFuture commit() { timeout.cancel(); @@ -202,16 +190,7 @@ public CompletableFuture abort() { if (e != null) { log.error(e.getMessage()); } - if (cumulativeAckConsumers != null) { - cumulativeAckConsumers.forEach((consumer, integer) -> - cumulativeAckConsumers - .putIfAbsent(consumer, consumer.clearIncomingMessagesAndGetMessageNumber())); - } tcClient.abortAsync(new TxnID(txnIdMostBits, txnIdLeastBits)).whenComplete((vx, ex) -> { - if (cumulativeAckConsumers != null) { - cumulativeAckConsumers.forEach(ConsumerImpl::increaseAvailablePermits); - cumulativeAckConsumers.clear(); - } if (ex != null) { if (ex instanceof TransactionNotFoundException