diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index 8ff339fed071c..ba35529d024ac 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -106,6 +106,9 @@ public void testNegativeAcks(boolean batching, boolean usePartitions, Subscripti log.info("Test negative acks batching={} partitions={} subType={} negAckDelayMs={}", batching, usePartitions, subscriptionType, negAcksDelayMillis); String topic = BrokerTestUtil.newUniqueName("testNegativeAcks"); + if (usePartitions) { + admin.topics().createPartitionedTopic(topic, 2); + } @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.STRING) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java index 17238ece38ea6..6273f4d582e18 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java @@ -85,29 +85,10 @@ private synchronized void triggerRedelivery(Timeout t) { } public synchronized void add(MessageId messageId) { - if (messageId instanceof BatchMessageIdImpl) { - BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; - messageId = new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(), - batchMessageId.getPartitionIndex()); - } - - if (nackedMessages == null) { - nackedMessages = new HashMap<>(); - } - nackedMessages.put(messageId, System.nanoTime() + nackDelayNanos); - - if (this.timeout == null) { - // Schedule a task and group all the redeliveries for same period. Leave a small buffer to allow for - // nack immediately following the current one will be batched into the same redeliver request. - this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS); - } + add(messageId, 0); } public synchronized void add(Message message) { - if (negativeAckRedeliveryBackoff == null) { - add(message.getMessageId()); - return; - } add(message.getMessageId(), message.getRedeliveryCount()); } @@ -127,7 +108,12 @@ private synchronized void add(MessageId messageId, int redeliveryCount) { nackedMessages = new HashMap<>(); } - long backoffNs = TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(redeliveryCount)); + long backoffNs; + if (negativeAckRedeliveryBackoff != null) { + backoffNs = TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(redeliveryCount)); + } else { + backoffNs = nackDelayNanos; + } nackedMessages.put(messageId, System.nanoTime() + backoffNs); if (this.timeout == null) {