Skip to content

Commit

Permalink
[fix][build] Fix compatibility issue introduced by #20750
Browse files Browse the repository at this point in the history
(cherry picked from commit 05ac1f9)
  • Loading branch information
liangyepianzhou authored and lhotari committed Aug 9, 2024
1 parent 651c8d8 commit 581ad60
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;

import static org.junit.Assert.assertTrue;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;

Expand Down Expand Up @@ -150,7 +151,7 @@ public void testNegativeAcks(boolean batching, boolean usePartitions, Subscripti
consumer.negativeAcknowledge(msg);
}

assertTrue(consumer instanceof ConsumerBase<String>);
assertTrue(consumer instanceof ConsumerBase);
assertEquals(((ConsumerBase<String>) consumer).getUnAckedMessageTracker().size(), 0);

Set<String> receivedMessages = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,20 @@ public void negativeAcknowledge(Message<?> message) {
negativeAcksTracker.add(message);

// Ensure the message is not redelivered for ack-timeout, since we did receive an "ack"
unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(message.getMessageId()));
unAckedMessageTracker.remove(discardBatch(message.getMessageId()));
}

static MessageIdImpl discardBatch(MessageId messageId) {
if (messageId instanceof ChunkMessageIdImpl) {
return (MessageIdImpl) messageId;
}
MessageIdImpl msgId;
if (messageId instanceof TopicMessageIdImpl) {
msgId = (MessageIdImpl) ((TopicMessageIdImpl) messageId).getInnerMessageId();
} else {
msgId = (MessageIdImpl) messageId;
}
return new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(), msgId.getPartitionIndex());
}

@Override
Expand Down

0 comments on commit 581ad60

Please sign in to comment.