Skip to content

Commit

Permalink
[feat]: Support auto scaled consumer receiver queue (#976)
Browse files Browse the repository at this point in the history
* feat: support auto scaled receiver queue
  • Loading branch information
Gleiphir2769 authored Mar 15, 2023
1 parent 257a9c8 commit b8563cd
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 23 deletions.
18 changes: 11 additions & 7 deletions perf/perf-consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ import (

// ConsumeArgs define the parameters required by consume
type ConsumeArgs struct {
Topic string
SubscriptionName string
ReceiverQueueSize int
EnableBatchIndexAck bool
Topic string
SubscriptionName string
ReceiverQueueSize int
EnableBatchIndexAck bool
EnableAutoScaledReceiverQueueSize bool
}

func newConsumerCommand() *cobra.Command {
Expand All @@ -57,6 +58,8 @@ func newConsumerCommand() *cobra.Command {
flags.StringVarP(&consumeArgs.SubscriptionName, "subscription", "s", "sub", "Subscription name")
flags.IntVarP(&consumeArgs.ReceiverQueueSize, "receiver-queue-size", "r", 1000, "Receiver queue size")
flags.BoolVar(&consumeArgs.EnableBatchIndexAck, "enable-batch-index-ack", false, "Whether to enable batch index ACK")
flags.BoolVar(&consumeArgs.EnableAutoScaledReceiverQueueSize, "enable-auto-scaled-queue-size", false,
"Whether to enable auto scaled receiver queue size")

return cmd
}
Expand All @@ -76,9 +79,10 @@ func consume(consumeArgs *ConsumeArgs, stop <-chan struct{}) {
defer client.Close()

consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: consumeArgs.Topic,
SubscriptionName: consumeArgs.SubscriptionName,
EnableBatchIndexAcknowledgment: consumeArgs.EnableBatchIndexAck,
Topic: consumeArgs.Topic,
SubscriptionName: consumeArgs.SubscriptionName,
EnableBatchIndexAcknowledgment: consumeArgs.EnableBatchIndexAck,
EnableAutoScaledReceiverQueueSize: consumeArgs.EnableAutoScaledReceiverQueueSize,
})

if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ type ConsumerOptions struct {
// Default value is `1000` messages and should be good for most use cases.
ReceiverQueueSize int

// EnableAutoScaledReceiverQueueSize, if enabled, the consumer receive queue will be auto-scaled
// by the consumer actual throughput. The ReceiverQueueSize will be the maximum size which consumer
// receive queue can be scaled.
// Default is false.
EnableAutoScaledReceiverQueueSize bool

// NackRedeliveryDelay specifies the delay after which to redeliver the messages that failed to be
// processed. Default is 1 min. (See `Consumer.Nack()`)
NackRedeliveryDelay time.Duration
Expand Down
1 change: 1 addition & 0 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
consumerEventListener: c.options.EventListener,
enableBatchIndexAck: c.options.EnableBatchIndexAcknowledgment,
ackGroupingOptions: c.options.AckGroupingOptions,
autoReceiverQueueSize: c.options.EnableAutoScaledReceiverQueueSize,
}
cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics)
ch <- ConsumerError{
Expand Down
113 changes: 97 additions & 16 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"math"
"strings"
"sync"
"sync/atomic"
"time"

"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -77,6 +76,10 @@ const (
nonDurable
)

const (
initialReceiverQueueSize = 1
)

const (
noMessageEntry = -1
)
Expand All @@ -89,6 +92,7 @@ type partitionConsumerOpts struct {
subscriptionInitPos SubscriptionInitialPosition
partitionIdx int
receiverQueueSize int
autoReceiverQueueSize bool
nackRedeliveryDelay time.Duration
nackBackoffPolicy NackBackoffPolicy
metadata map[string]string
Expand Down Expand Up @@ -142,11 +146,15 @@ type partitionConsumer struct {
availablePermits *availablePermits

// the size of the queue channel for buffering messages
queueSize int32
maxQueueSize int32
queueCh chan []*message
startMessageID atomicMessageID
lastDequeuedMsg *trackingMessageID

currentQueueSize uAtomic.Int32
scaleReceiverQueueHint uAtomic.Bool
incomingMessages uAtomic.Int32

eventsCh chan interface{}
connectedCh chan struct{}
connectClosedCh chan connectionClosed
Expand Down Expand Up @@ -181,22 +189,44 @@ func (pc *partitionConsumer) ActiveConsumerChanged(isActive bool) {
}

type availablePermits struct {
permits int32
permits uAtomic.Int32
pc *partitionConsumer
}

func (p *availablePermits) inc() {
// atomic add availablePermits
ap := atomic.AddInt32(&p.permits, 1)
p.add(1)
}

func (p *availablePermits) add(delta int32) {
p.permits.Add(delta)
p.flowIfNeed()
}

func (p *availablePermits) reset() {
p.permits.Store(0)
}

func (p *availablePermits) get() int32 {
return p.permits.Load()
}

func (p *availablePermits) flowIfNeed() {
// TODO implement a better flow controller
// send more permits if needed
flowThreshold := int32(math.Max(float64(p.pc.queueSize/2), 1))
if ap >= flowThreshold {
availablePermits := ap
requestedPermits := ap
var flowThreshold int32
if p.pc.options.autoReceiverQueueSize {
flowThreshold = int32(math.Max(float64(p.pc.currentQueueSize.Load()/2), 1))
} else {
flowThreshold = int32(math.Max(float64(p.pc.maxQueueSize/2), 1))
}

current := p.get()
if current >= flowThreshold {
availablePermits := current
requestedPermits := current
// check if permits changed
if !atomic.CompareAndSwapInt32(&p.permits, ap, 0) {
if !p.permits.CAS(current, 0) {
return
}

Expand All @@ -207,10 +237,6 @@ func (p *availablePermits) inc() {
}
}

func (p *availablePermits) reset() {
atomic.StoreInt32(&p.permits, 0)
}

// atomicMessageID is a wrapper for trackingMessageID to make get and set atomic
type atomicMessageID struct {
msgID *trackingMessageID
Expand Down Expand Up @@ -292,7 +318,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
consumerID: client.rpcClient.NewConsumerID(),
partitionIdx: int32(options.partitionIdx),
eventsCh: make(chan interface{}, 10),
queueSize: int32(options.receiverQueueSize),
maxQueueSize: int32(options.receiverQueueSize),
queueCh: make(chan []*message, options.receiverQueueSize),
startMessageID: atomicMessageID{msgID: options.startMessageID},
connectedCh: make(chan struct{}),
Expand All @@ -305,6 +331,11 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
metrics: metrics,
schemaInfoCache: newSchemaInfoCache(client, options.topic),
}
if pc.options.autoReceiverQueueSize {
pc.currentQueueSize.Store(initialReceiverQueueSize)
} else {
pc.currentQueueSize.Store(int32(pc.options.receiverQueueSize))
}
pc.availablePermits = &availablePermits{pc: pc}
pc.chunkedMsgCtxMap = newChunkedMsgCtxMap(options.maxPendingChunkedMessage, pc)
pc.unAckChunksTracker = newUnAckChunksTracker(pc)
Expand Down Expand Up @@ -904,6 +935,12 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
orderingKey: string(msgMeta.OrderingKey),
},
}

if pc.options.autoReceiverQueueSize {
pc.incomingMessages.Inc()
pc.markScaleIfNeed()
}

pc.queueCh <- messages
return nil
}
Expand Down Expand Up @@ -1073,6 +1110,11 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
messages = append(messages, msg)
}

if pc.options.autoReceiverQueueSize {
pc.incomingMessages.Add(int32(len(messages)))
pc.markScaleIfNeed()
}

// send messages to the dispatcher
pc.queueCh <- messages
return nil
Expand Down Expand Up @@ -1240,7 +1282,6 @@ func (pc *partitionConsumer) dispatcher() {
pc.metrics.PrefetchedMessages.Dec()
pc.metrics.PrefetchedBytes.Sub(float64(len(messages[0].payLoad)))
} else {
// we are ready for more messages
queueCh = pc.queueCh
}

Expand All @@ -1258,7 +1299,13 @@ func (pc *partitionConsumer) dispatcher() {

// reset available permits
pc.availablePermits.reset()
initialPermits := uint32(pc.queueSize)

var initialPermits uint32
if pc.options.autoReceiverQueueSize {
initialPermits = uint32(pc.currentQueueSize.Load())
} else {
initialPermits = uint32(pc.maxQueueSize)
}

pc.log.Debugf("dispatcher requesting initial permits=%d", initialPermits)
// send initial permits
Expand All @@ -1282,6 +1329,11 @@ func (pc *partitionConsumer) dispatcher() {

pc.availablePermits.inc()

if pc.options.autoReceiverQueueSize {
pc.incomingMessages.Dec()
pc.expectMoreIncomingMessages()
}

case clearQueueCb := <-pc.clearQueueCh:
// drain the message queue on any new connection by sending a
// special nil message to the channel so we know when to stop dropping messages
Expand All @@ -1297,6 +1349,9 @@ func (pc *partitionConsumer) dispatcher() {
} else if nextMessageInQueue == nil {
nextMessageInQueue = toTrackingMessageID(m[0].msgID)
}
if pc.options.autoReceiverQueueSize {
pc.incomingMessages.Sub(int32(len(m)))
}
}

messages = nil
Expand Down Expand Up @@ -1671,6 +1726,32 @@ func getPreviousMessage(mid *trackingMessageID) *trackingMessageID {
}
}

func (pc *partitionConsumer) expectMoreIncomingMessages() {
if !pc.options.autoReceiverQueueSize {
return
}
if pc.scaleReceiverQueueHint.CAS(true, false) {
oldSize := pc.currentQueueSize.Load()
maxSize := int32(pc.options.receiverQueueSize)
newSize := int32(math.Min(float64(maxSize), float64(oldSize*2)))
if newSize > oldSize {
pc.currentQueueSize.CAS(oldSize, newSize)
pc.availablePermits.add(newSize - oldSize)
pc.log.Debugf("update currentQueueSize from %d -> %d", oldSize, newSize)
}
}
}

func (pc *partitionConsumer) markScaleIfNeed() {
// availablePermits + incomingMessages (messages in queueCh) is the number of prefetched messages
// The result of auto-scale we expected is currentQueueSize is slightly bigger than prefetched messages
prev := pc.scaleReceiverQueueHint.Swap(pc.availablePermits.get()+pc.incomingMessages.Load() >=
pc.currentQueueSize.Load())
if prev != pc.scaleReceiverQueueHint.Load() {
pc.log.Debugf("update scaleReceiverQueueHint from %t -> %t", prev, pc.scaleReceiverQueueHint.Load())
}
}

func (pc *partitionConsumer) Decompress(msgMeta *pb.MessageMetadata, payload internal.Buffer) (internal.Buffer, error) {
providerEntry, ok := pc.compressionProviders.Load(msgMeta.GetCompression())
if !ok {
Expand Down
3 changes: 3 additions & 0 deletions pulsar/consumer_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
metrics: newTestMetrics(),
decryptor: crypto.NewNoopDecryptor(),
}
pc.availablePermits = &availablePermits{pc: &pc}
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)

Expand Down Expand Up @@ -75,6 +76,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
metrics: newTestMetrics(),
decryptor: crypto.NewNoopDecryptor(),
}
pc.availablePermits = &availablePermits{pc: &pc}
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)

Expand Down Expand Up @@ -110,6 +112,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
metrics: newTestMetrics(),
decryptor: crypto.NewNoopDecryptor(),
}
pc.availablePermits = &availablePermits{pc: &pc}
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil)

Expand Down
Loading

0 comments on commit b8563cd

Please sign in to comment.