Skip to content

Commit

Permalink
Fix batched messages not ACKed correctly when batch index ACK is disa…
Browse files Browse the repository at this point in the history
…bled (#994)

* Fix batched messages not ACKed correctly when batch index ACK is disabled

Fixes #993

### Motivation

When batch index ACK is disabled, if N messages in a batch are
acknowledged, currently only the batched message ID of the last message
will be acknowledged. This behavior is wrong because we need to
acknowledge the whole batch.

### Modifications

- Create a `messageID` instance to ACK for this case
- Add `TestConsumerBatchIndexAckDisabled` to cover this case

* Fix ackRequest error

* Fix wrong received msg id

* Add comments
  • Loading branch information
BewareMyPower authored Mar 20, 2023
1 parent b8563cd commit 04ad521
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 0 deletions.
8 changes: 8 additions & 0 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,14 @@ func (pc *partitionConsumer) ackID(msgID MessageID, withResponse bool) error {
trackingID := toTrackingMessageID(msgID)

if trackingID != nil && trackingID.ack() {
// All messages in the same batch have been acknowledged, we only need to acknowledge the
// MessageID that represents the entry that stores the whole batch
trackingID = &trackingMessageID{
messageID: &messageID{
ledgerID: trackingID.ledgerID,
entryID: trackingID.entryID,
},
}
pc.metrics.AcksCounter.Inc()
pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano()) / 1.0e9)
} else if !pc.options.enableBatchIndexAck {
Expand Down
40 changes: 40 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4073,3 +4073,43 @@ func TestConsumerWithAutoScaledQueueReceive(t *testing.T) {
return assert.Equal(t, 3, int(pc.currentQueueSize.Load()))
})
}

func TestConsumerBatchIndexAckDisabled(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()

topic := newTopicName()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
})
assert.Nil(t, err)
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
})
assert.Nil(t, err)

for i := 0; i < 5; i++ {
producer.SendAsync(context.Background(), &ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-%d", i)),
}, nil)
}
for i := 0; i < 5; i++ {
message, err := consumer.Receive(context.Background())
assert.Nil(t, err)
consumer.Ack(message)
}
consumer.Close()
consumer, err = client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
})
assert.Nil(t, err)
producer.Send(context.Background(), &ProducerMessage{Payload: []byte("done")})
message, err := consumer.Receive(context.Background())
assert.Nil(t, err)
assert.Equal(t, []byte("done"), message.Payload())
}

0 comments on commit 04ad521

Please sign in to comment.