Skip to content

Commit

Permalink
Optimize batch index ACK performance (#988)
Browse files Browse the repository at this point in the history
### Motivation

Currently, when `EnableBatchIndexAck` is true, the ACK performance is
very poor. There are two main reasons:
1. Acknowledgment by list is not supported. It means that even N
   MessageIDs are grouped, there are still N ACK requests to send.
2. The implementation of ACK grouping tracker is wrong. Give a batch
   that has N messages, when batch index ACK is enabled, each MessageID
   is cached. However, after all these N MessageIDs arrived, the current
   implementation does not clear them.

### Modifications
- Add a `func(id []*pb.MessageIdData)` to the ACK grouping tracker. When
  flushing individual ACKs, construct the slice and wrap the slice to
  `CommandAck` directly.
- Refactor the implementation of the ACK grouping tracker:
  - Do not save each MessageID instance, instead, save the ledger id and
    the entry id as the key of `pendingAcks`.
  - Release the mutex before calling ACK functions
- Add `TestTrackerPendingAcks` to verify the list of MessageIDs to ACK.

After this change, the ACK order cannot be guaranteed, sort the
acknowledged MessageIDs in the `ack_grouping_tracker_test.go`.
  • Loading branch information
BewareMyPower authored Mar 10, 2023
1 parent e269c42 commit 352c463
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 166 deletions.
281 changes: 137 additions & 144 deletions pulsar/ack_grouping_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package pulsar

import (
"sync"
"sync/atomic"
"time"

pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/bits-and-blooms/bitset"
)

Expand All @@ -40,7 +42,8 @@ type ackGroupingTracker interface {

func newAckGroupingTracker(options *AckGroupingOptions,
ackIndividual func(id MessageID),
ackCumulative func(id MessageID)) ackGroupingTracker {
ackCumulative func(id MessageID),
ackList func(ids []*pb.MessageIdData)) ackGroupingTracker {
if options == nil {
options = &AckGroupingOptions{
MaxSize: 1000,
Expand All @@ -56,38 +59,28 @@ func newAckGroupingTracker(options *AckGroupingOptions,
}

t := &timedAckGroupingTracker{
singleAcks: make([]MessageID, options.MaxSize),
pendingAcks: make(map[int64]*bitset.BitSet),
lastCumulativeAck: EarliestMessageID(),
ackIndividual: ackIndividual,
maxNumAcks: int(options.MaxSize),
ackCumulative: ackCumulative,
ackList: func(ids []MessageID) {
// TODO: support ack a list of MessageIDs
for _, id := range ids {
ackIndividual(id)
}
},
options: *options,
tick: time.NewTicker(time.Hour),
donCh: make(chan struct{}),
ackList: ackList,
pendingAcks: make(map[[2]uint64]*bitset.BitSet),
lastCumulativeAck: EarliestMessageID(),
}

if options.MaxTime > 0 {
t.tick = time.NewTicker(options.MaxTime)
} else {
t.tick.Stop()
t.ticker = time.NewTicker(options.MaxTime)
t.exitCh = make(chan struct{})
go func() {
for {
select {
case <-t.exitCh:
return
case <-t.ticker.C:
t.flush()
}
}
}()
}

go func() {
for {
select {
case <-t.donCh:
return
case <-t.tick.C:
t.flush()
}
}
}()
return t
}

Expand Down Expand Up @@ -117,157 +110,157 @@ func (i *immediateAckGroupingTracker) flushAndClean() {
func (i *immediateAckGroupingTracker) close() {
}

func (t *timedAckGroupingTracker) addAndCheckIfFull(id MessageID) bool {
t.mutex.Lock()
defer t.mutex.Unlock()
t.singleAcks[t.index] = id
t.index++
key := messageIDHash(id)
ackSet, found := t.pendingAcks[key]
if !found {
if messageIDIsBatch(id) {
ackSet = bitset.New(uint(id.BatchSize()))
for i := 0; i < int(id.BatchSize()); i++ {
ackSet.Set(uint(i))
}
t.pendingAcks[key] = ackSet
} else {
t.pendingAcks[key] = nil
}
}
if ackSet != nil {
ackSet.Clear(uint(id.BatchIdx()))
}
return t.index == len(t.singleAcks)
}

func (t *timedAckGroupingTracker) tryUpdateLastCumulativeAck(id MessageID) {
t.mutex.Lock()
defer t.mutex.Unlock()
if messageIDCompare(t.lastCumulativeAck, id) < 0 {
t.lastCumulativeAck = id
t.cumulativeAckRequired = true
}
}

func (t *timedAckGroupingTracker) flushIndividualAcks() {
t.mutex.Lock()
defer t.mutex.Unlock()
if t.index > 0 {
t.ackList(t.singleAcks[0:t.index])
for _, id := range t.singleAcks[0:t.index] {
key := messageIDHash(id)
ackSet, found := t.pendingAcks[key]
if !found {
continue
}
if ackSet == nil {
delete(t.pendingAcks, key)
} else {
ackSet.Clear(uint(id.BatchIdx()))
if ackSet.None() { // all messages have been acknowledged
delete(t.pendingAcks, key)
}
}
delete(t.pendingAcks, messageIDHash(id))
}
t.index = 0
}
}

func (t *timedAckGroupingTracker) flushCumulativeAck() {
t.mutex.Lock()
defer t.mutex.Unlock()
if t.cumulativeAckRequired {
t.ackCumulative(t.lastCumulativeAck)
t.cumulativeAckRequired = false
}
}

func (t *timedAckGroupingTracker) clean() {
t.mutex.Lock()
defer t.mutex.Unlock()
maxSize := len(t.singleAcks)
t.singleAcks = make([]MessageID, maxSize)
t.index = 0
t.pendingAcks = make(map[int64]*bitset.BitSet)
t.lastCumulativeAck = EarliestMessageID()
t.cumulativeAckRequired = false
}

type timedAckGroupingTracker struct {
singleAcks []MessageID
index int
sync.RWMutex

// Key is the hash code of the ledger id and the netry id,
maxNumAcks int
ackCumulative func(id MessageID)
ackList func(ids []*pb.MessageIdData)
ticker *time.Ticker

// Key is the pair of the ledger id and the entry id,
// Value is the bit set that represents which messages are acknowledged if the entry stores a batch.
// The bit 1 represents the message has been acknowledged, i.e. the bits "111" represents all messages
// in the batch whose batch size is 3 are not acknowledged.
// After the 1st message (i.e. batch index is 0) is acknowledged, the bits will become "011".
// Value is nil if the entry represents a single message.
pendingAcks map[int64]*bitset.BitSet
pendingAcks map[[2]uint64]*bitset.BitSet

lastCumulativeAck MessageID
cumulativeAckRequired bool

ackIndividual func(id MessageID)
ackCumulative func(id MessageID)
ackList func(ids []MessageID)

options AckGroupingOptions
donCh chan struct{}
tick *time.Ticker
cumulativeAckRequired int32

mutex sync.RWMutex
exitCh chan struct{}
}

func (t *timedAckGroupingTracker) add(id MessageID) {
if t.addAndCheckIfFull(id) {
t.flushIndividualAcks()
if t.options.MaxTime > 0 {
t.tick.Reset(t.options.MaxTime)
if acks := t.tryAddIndividual(id); acks != nil {
t.flushIndividual(acks)
}
}

func (t *timedAckGroupingTracker) tryAddIndividual(id MessageID) map[[2]uint64]*bitset.BitSet {
t.Lock()
defer t.Unlock()
key := [2]uint64{uint64(id.LedgerID()), uint64(id.EntryID())}

batchIdx := id.BatchIdx()
batchSize := id.BatchSize()

if batchIdx >= 0 && batchSize > 0 {
bs, found := t.pendingAcks[key]
if !found {
if batchSize > 1 {
bs = bitset.New(uint(batchSize))
for i := uint(0); i < uint(batchSize); i++ {
bs.Set(i)
}
}
t.pendingAcks[key] = bs
}
if bs != nil {
bs.Clear(uint(batchIdx))
}
} else {
t.pendingAcks[key] = nil
}

if len(t.pendingAcks) >= t.maxNumAcks {
pendingAcks := t.pendingAcks
t.pendingAcks = make(map[[2]uint64]*bitset.BitSet)
return pendingAcks
}
return nil
}

func (t *timedAckGroupingTracker) addCumulative(id MessageID) {
t.tryUpdateLastCumulativeAck(id)
if t.options.MaxTime <= 0 {
t.flushCumulativeAck()
if t.tryUpdateCumulative(id) && t.ticker == nil {
t.ackCumulative(id)
}
}

func (t *timedAckGroupingTracker) tryUpdateCumulative(id MessageID) bool {
t.Lock()
defer t.Unlock()
if messageIDCompare(t.lastCumulativeAck, id) < 0 {
t.lastCumulativeAck = id
atomic.StoreInt32(&t.cumulativeAckRequired, 1)
return true
}
return false
}

func (t *timedAckGroupingTracker) isDuplicate(id MessageID) bool {
t.mutex.RLock()
t.RLock()
defer t.RUnlock()
if messageIDCompare(t.lastCumulativeAck, id) >= 0 {
t.mutex.RUnlock()
return true
}
ackSet, found := t.pendingAcks[messageIDHash(id)]
if !found {
t.mutex.RUnlock()
return false
}
t.mutex.RUnlock()
if ackSet == nil || !messageIDIsBatch(id) {
// NOTE: should we panic when ackSet != nil and messageIDIsBatch(id) is true?
return true
key := [2]uint64{uint64(id.LedgerID()), uint64(id.EntryID())}
if bs, found := t.pendingAcks[key]; found {
if bs == nil {
return true
}
if !bs.Test(uint(id.BatchIdx())) {
return true
}
}
// 0 represents the message has been acknowledged
return !ackSet.Test(uint(id.BatchIdx()))
return false
}

func (t *timedAckGroupingTracker) flush() {
t.flushIndividualAcks()
t.flushCumulativeAck()
if acks := t.clearPendingAcks(); len(acks) > 0 {
t.flushIndividual(acks)
}
if atomic.CompareAndSwapInt32(&t.cumulativeAckRequired, 1, 0) {
t.RLock()
id := t.lastCumulativeAck
t.RUnlock()
t.ackCumulative(id)
}
}

func (t *timedAckGroupingTracker) flushAndClean() {
t.flush()
t.clean()
if acks := t.clearPendingAcks(); len(acks) > 0 {
t.flushIndividual(acks)
}
if atomic.CompareAndSwapInt32(&t.cumulativeAckRequired, 1, 0) {
t.Lock()
id := t.lastCumulativeAck
t.lastCumulativeAck = EarliestMessageID()
t.Unlock()
t.ackCumulative(id)
}
}

func (t *timedAckGroupingTracker) clearPendingAcks() map[[2]uint64]*bitset.BitSet {
t.Lock()
defer t.Unlock()
pendingAcks := t.pendingAcks
t.pendingAcks = make(map[[2]uint64]*bitset.BitSet)
return pendingAcks
}

func (t *timedAckGroupingTracker) close() {
t.flushAndClean()
close(t.donCh)
if t.exitCh != nil {
close(t.exitCh)
}
}

func (t *timedAckGroupingTracker) flushIndividual(pendingAcks map[[2]uint64]*bitset.BitSet) {
msgIDs := make([]*pb.MessageIdData, 0, len(pendingAcks))
for k, v := range pendingAcks {
ledgerID := k[0]
entryID := k[1]
msgID := &pb.MessageIdData{LedgerId: &ledgerID, EntryId: &entryID}
if v != nil && !v.None() {
bytes := v.Bytes()
msgID.AckSet = make([]int64, len(bytes))
for i := 0; i < len(bytes); i++ {
msgID.AckSet[i] = int64(bytes[i])
}
}
msgIDs = append(msgIDs, msgID)
}
t.ackList(msgIDs)
}
Loading

0 comments on commit 352c463

Please sign in to comment.