Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][txn] Transaction cumulative ack redeliver change #14371

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet;
import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.compareToWithAckSet;
import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetOverlap;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -528,9 +527,10 @@ public CompletableFuture<Void> internalAbortTxn(TxnID txnId, Consumer consumer,
if (cumulativeAckOfTransaction.getKey().equals(txnId)) {
cumulativeAckOfTransaction = null;
}
//TODO: pendingAck handle next pr will fix
persistentSubscription.redeliverUnacknowledgedMessages(consumer, DEFAULT_CONSUMER_EPOCH);
abortFuture.complete(null);

// in cumulative ack with transaction, don't depend on server redeliver message,
// it will cause the messages to be out of order
}).exceptionally(e -> {
log.error("[{}] Transaction pending ack store abort txnId : [{}] fail!",
topicName, txnId, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -1066,6 +1067,74 @@ public void testTxnTimeOutInClient() throws Exception{
}
}

@Test
public void testCumulativeAckRedeliverMessages() throws Exception {
String topic = NAMESPACE1 + "/testCumulativeAckRedeliverMessages";

int count = 5;
int transactionCumulativeAck = 3;
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("test")
.subscribe();

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
.create();

// send 5 messages
for (int i = 0; i < count; i++) {
producer.send((i + "").getBytes(UTF_8));
}

Transaction transaction = getTxn();
Transaction invalidTransaction = getTxn();

Message<byte[]> message = null;
for (int i = 0; i < transactionCumulativeAck; i++) {
message = consumer.receive();
}

// receive transaction in order
assertEquals(message.getValue(), (transactionCumulativeAck - 1 + "").getBytes(UTF_8));

// ack the last message
consumer.acknowledgeCumulativeAsync(message.getMessageId(), transaction).get();

// another ack will throw TransactionConflictException
try {
consumer.acknowledgeCumulativeAsync(message.getMessageId(), invalidTransaction).get();
fail();
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException);
// abort transaction then redeliver messages
transaction.abort().get();
// consumer redeliver messages
consumer.redeliverUnacknowledgedMessages();
}

// receive the rest of the message
for (int i = 0; i < count; i++) {
message = consumer.receive();
congbobo184 marked this conversation as resolved.
Show resolved Hide resolved
}

Transaction commitTransaction = getTxn();

// receive the first message
assertEquals(message.getValue(), (count - 1 + "").getBytes(UTF_8));
// ack the end of the message
consumer.acknowledgeCumulativeAsync(message.getMessageId(), commitTransaction).get();

commitTransaction.commit().get();

// then redeliver will not receive any message
message = consumer.receive(3, TimeUnit.SECONDS);
assertNull(message);
}

@Test
public void testSendTxnMessageTimeout() throws Exception {
String topic = NAMESPACE1 + "/testSendTxnMessageTimeout";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,14 +626,6 @@ protected CompletableFuture<Void> doAcknowledgeWithTxn(List<MessageId> messageId
TransactionImpl txn) {
CompletableFuture<Void> ackFuture;
if (txn != null && this instanceof ConsumerImpl) {

// it is okay that we register acked topic after sending the acknowledgements. because
// the transactional ack will not be visiable for consumers until the transaction is
// committed
if (ackType == AckType.Cumulative) {
txn.registerCumulativeAckConsumer((ConsumerImpl<?>) this);
}

ackFuture = txn.registerAckedTopic(getTopic(), subscription)
.thenCompose(ignored -> doAcknowledge(messageIdList, ackType, properties, txn));
// register the ackFuture as part of the transaction
Expand All @@ -649,13 +641,6 @@ protected CompletableFuture<Void> doAcknowledgeWithTxn(MessageId messageId, AckT
TransactionImpl txn) {
CompletableFuture<Void> ackFuture;
if (txn != null && (this instanceof ConsumerImpl)) {
// it is okay that we register acked topic after sending the acknowledgements. because
// the transactional ack will not be visiable for consumers until the transaction is
// committed
if (ackType == AckType.Cumulative) {
txn.registerCumulativeAckConsumer((ConsumerImpl<?>) this);
}

ackFuture = txn.registerAckedTopic(getTopic(), subscription)
.thenCompose(ignored -> doAcknowledge(messageId, ackType, properties, txn));
// register the ackFuture as part of the transaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -37,7 +36,6 @@
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.InvalidTxnStatusException;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionNotFoundException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.util.FutureUtil;

Expand All @@ -62,7 +60,6 @@ public class TransactionImpl implements Transaction , TimerTask {
private final Map<String, CompletableFuture<Void>> registerPartitionMap;
private final Map<Pair<String, String>, CompletableFuture<Void>> registerSubscriptionMap;
private final TransactionCoordinatorClientImpl tcClient;
private Map<ConsumerImpl<?>, Integer> cumulativeAckConsumers;

private final ArrayList<CompletableFuture<MessageId>> sendFutureList;
private final ArrayList<CompletableFuture<Void>> ackFutureList;
Expand Down Expand Up @@ -122,9 +119,8 @@ public CompletableFuture<Void> registerProducedTopic(String topic) {
}
});
}
} else {
return completableFuture;
}
return completableFuture;
}

public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture) {
Expand All @@ -147,22 +143,14 @@ public CompletableFuture<Void> registerAckedTopic(String topic, String subscript
}
});
}
} else {
return completableFuture;
}
return completableFuture;
}

public synchronized void registerAckOp(CompletableFuture<Void> ackFuture) {
ackFutureList.add(ackFuture);
}

public synchronized void registerCumulativeAckConsumer(ConsumerImpl<?> consumer) {
if (this.cumulativeAckConsumers == null) {
this.cumulativeAckConsumers = new HashMap<>();
}
cumulativeAckConsumers.put(consumer, 0);
}

@Override
public CompletableFuture<Void> commit() {
timeout.cancel();
Expand Down Expand Up @@ -202,16 +190,7 @@ public CompletableFuture<Void> abort() {
if (e != null) {
log.error(e.getMessage());
}
if (cumulativeAckConsumers != null) {
cumulativeAckConsumers.forEach((consumer, integer) ->
cumulativeAckConsumers
.putIfAbsent(consumer, consumer.clearIncomingMessagesAndGetMessageNumber()));
}
tcClient.abortAsync(new TxnID(txnIdMostBits, txnIdLeastBits)).whenComplete((vx, ex) -> {
if (cumulativeAckConsumers != null) {
cumulativeAckConsumers.forEach(ConsumerImpl::increaseAvailablePermits);
cumulativeAckConsumers.clear();
}

if (ex != null) {
if (ex instanceof TransactionNotFoundException
Expand Down