Skip to content

Commit

Permalink
[fix][client] Fix negative ack not redelivery. (#15312)
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- authored Apr 26, 2022
1 parent 7086ac4 commit 9f6532a
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ public void testNegativeAcks(boolean batching, boolean usePartitions, Subscripti
log.info("Test negative acks batching={} partitions={} subType={} negAckDelayMs={}", batching, usePartitions,
subscriptionType, negAcksDelayMillis);
String topic = BrokerTestUtil.newUniqueName("testNegativeAcks");
if (usePartitions) {
admin.topics().createPartitionedTopic(topic, 2);
}

@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,29 +85,10 @@ private synchronized void triggerRedelivery(Timeout t) {
}

public synchronized void add(MessageId messageId) {
if (messageId instanceof BatchMessageIdImpl) {
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
messageId = new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(),
batchMessageId.getPartitionIndex());
}

if (nackedMessages == null) {
nackedMessages = new HashMap<>();
}
nackedMessages.put(messageId, System.nanoTime() + nackDelayNanos);

if (this.timeout == null) {
// Schedule a task and group all the redeliveries for same period. Leave a small buffer to allow for
// nack immediately following the current one will be batched into the same redeliver request.
this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS);
}
add(messageId, 0);
}

public synchronized void add(Message<?> message) {
if (negativeAckRedeliveryBackoff == null) {
add(message.getMessageId());
return;
}
add(message.getMessageId(), message.getRedeliveryCount());
}

Expand All @@ -127,7 +108,12 @@ private synchronized void add(MessageId messageId, int redeliveryCount) {
nackedMessages = new HashMap<>();
}

long backoffNs = TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(redeliveryCount));
long backoffNs;
if (negativeAckRedeliveryBackoff != null) {
backoffNs = TimeUnit.MILLISECONDS.toNanos(negativeAckRedeliveryBackoff.next(redeliveryCount));
} else {
backoffNs = nackDelayNanos;
}
nackedMessages.put(messageId, System.nanoTime() + backoffNs);

if (this.timeout == null) {
Expand Down

0 comments on commit 9f6532a

Please sign in to comment.