Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat]: Support auto scaled consumer receiver queue #976

Merged
merged 7 commits into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions perf/perf-consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ import (

// ConsumeArgs define the parameters required by consume
type ConsumeArgs struct {
Topic string
SubscriptionName string
ReceiverQueueSize int
EnableBatchIndexAck bool
Topic string
SubscriptionName string
ReceiverQueueSize int
EnableBatchIndexAck bool
EnableAutoScaledReceiverQueueSize bool
}

func newConsumerCommand() *cobra.Command {
Expand All @@ -57,6 +58,8 @@ func newConsumerCommand() *cobra.Command {
flags.StringVarP(&consumeArgs.SubscriptionName, "subscription", "s", "sub", "Subscription name")
flags.IntVarP(&consumeArgs.ReceiverQueueSize, "receiver-queue-size", "r", 1000, "Receiver queue size")
flags.BoolVar(&consumeArgs.EnableBatchIndexAck, "enable-batch-index-ack", false, "Whether to enable batch index ACK")
flags.BoolVar(&consumeArgs.EnableAutoScaledReceiverQueueSize, "enable-auto-scaled-queue-size", false,
"Whether to enable auto scaled receiver queue size")

return cmd
}
Expand All @@ -76,9 +79,10 @@ func consume(consumeArgs *ConsumeArgs, stop <-chan struct{}) {
defer client.Close()

consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: consumeArgs.Topic,
SubscriptionName: consumeArgs.SubscriptionName,
EnableBatchIndexAcknowledgment: consumeArgs.EnableBatchIndexAck,
Topic: consumeArgs.Topic,
SubscriptionName: consumeArgs.SubscriptionName,
EnableBatchIndexAcknowledgment: consumeArgs.EnableBatchIndexAck,
EnableAutoScaledReceiverQueueSize: consumeArgs.EnableAutoScaledReceiverQueueSize,
})

if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ type ConsumerOptions struct {
// Default value is `1000` messages and should be good for most use cases.
ReceiverQueueSize int

// EnableAutoScaledReceiverQueueSize, if enabled, the consumer receive queue will be auto-scaled
// by the consumer actual throughput. The ReceiverQueueSize will be the maximum size which consumer
// receive queue can be scaled.
// Default is false.
EnableAutoScaledReceiverQueueSize bool

// NackRedeliveryDelay specifies the delay after which to redeliver the messages that failed to be
// processed. Default is 1 min. (See `Consumer.Nack()`)
NackRedeliveryDelay time.Duration
Expand Down
1 change: 1 addition & 0 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
consumerEventListener: c.options.EventListener,
enableBatchIndexAck: c.options.EnableBatchIndexAcknowledgment,
ackGroupingOptions: c.options.AckGroupingOptions,
autoReceiverQueueSize: c.options.EnableAutoScaledReceiverQueueSize,
}
cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics)
ch <- ConsumerError{
Expand Down
113 changes: 97 additions & 16 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"math"
"strings"
"sync"
"sync/atomic"
"time"

"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -77,6 +76,10 @@ const (
nonDurable
)

const (
initialReceiverQueueSize = 1
)

const (
noMessageEntry = -1
)
Expand All @@ -89,6 +92,7 @@ type partitionConsumerOpts struct {
subscriptionInitPos SubscriptionInitialPosition
partitionIdx int
receiverQueueSize int
autoReceiverQueueSize bool
nackRedeliveryDelay time.Duration
nackBackoffPolicy NackBackoffPolicy
metadata map[string]string
Expand Down Expand Up @@ -142,11 +146,15 @@ type partitionConsumer struct {
availablePermits *availablePermits

// the size of the queue channel for buffering messages
queueSize int32
maxQueueSize int32
queueCh chan []*message
startMessageID atomicMessageID
lastDequeuedMsg *trackingMessageID

currentQueueSize uAtomic.Int32
scaleReceiverQueueHint uAtomic.Bool
incomingMessages uAtomic.Int32

eventsCh chan interface{}
connectedCh chan struct{}
connectClosedCh chan connectionClosed
Expand Down Expand Up @@ -181,22 +189,44 @@ func (pc *partitionConsumer) ActiveConsumerChanged(isActive bool) {
}

type availablePermits struct {
permits int32
permits uAtomic.Int32
pc *partitionConsumer
}

func (p *availablePermits) inc() {
// atomic add availablePermits
ap := atomic.AddInt32(&p.permits, 1)
p.add(1)
}

func (p *availablePermits) add(delta int32) {
p.permits.Add(delta)
p.flowIfNeed()
}

func (p *availablePermits) reset() {
p.permits.Store(0)
}

func (p *availablePermits) get() int32 {
return p.permits.Load()
}

func (p *availablePermits) flowIfNeed() {
// TODO implement a better flow controller
// send more permits if needed
flowThreshold := int32(math.Max(float64(p.pc.queueSize/2), 1))
if ap >= flowThreshold {
availablePermits := ap
requestedPermits := ap
var flowThreshold int32
if p.pc.options.autoReceiverQueueSize {
flowThreshold = int32(math.Max(float64(p.pc.currentQueueSize.Load()/2), 1))
} else {
flowThreshold = int32(math.Max(float64(p.pc.maxQueueSize/2), 1))
}

current := p.get()
if current >= flowThreshold {
availablePermits := current
requestedPermits := current
// check if permits changed
if !atomic.CompareAndSwapInt32(&p.permits, ap, 0) {
if !p.permits.CAS(current, 0) {
return
}

Expand All @@ -207,10 +237,6 @@ func (p *availablePermits) inc() {
}
}

func (p *availablePermits) reset() {
atomic.StoreInt32(&p.permits, 0)
}

// atomicMessageID is a wrapper for trackingMessageID to make get and set atomic
type atomicMessageID struct {
msgID *trackingMessageID
Expand Down Expand Up @@ -292,7 +318,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
consumerID: client.rpcClient.NewConsumerID(),
partitionIdx: int32(options.partitionIdx),
eventsCh: make(chan interface{}, 10),
queueSize: int32(options.receiverQueueSize),
maxQueueSize: int32(options.receiverQueueSize),
queueCh: make(chan []*message, options.receiverQueueSize),
startMessageID: atomicMessageID{msgID: options.startMessageID},
connectedCh: make(chan struct{}),
Expand All @@ -305,6 +331,11 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
metrics: metrics,
schemaInfoCache: newSchemaInfoCache(client, options.topic),
}
if pc.options.autoReceiverQueueSize {
pc.currentQueueSize.Store(initialReceiverQueueSize)
} else {
pc.currentQueueSize.Store(int32(pc.options.receiverQueueSize))
}
pc.availablePermits = &availablePermits{pc: pc}
pc.chunkedMsgCtxMap = newChunkedMsgCtxMap(options.maxPendingChunkedMessage, pc)
pc.unAckChunksTracker = newUnAckChunksTracker(pc)
Expand Down Expand Up @@ -876,6 +907,12 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
orderingKey: string(msgMeta.OrderingKey),
},
}

if pc.options.autoReceiverQueueSize {
pc.incomingMessages.Inc()
pc.markScaleIfNeed()
}

pc.queueCh <- messages
return nil
}
Expand Down Expand Up @@ -1045,6 +1082,11 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
messages = append(messages, msg)
}

if pc.options.autoReceiverQueueSize {
pc.incomingMessages.Add(int32(len(messages)))
pc.markScaleIfNeed()
}

// send messages to the dispatcher
pc.queueCh <- messages
return nil
Expand Down Expand Up @@ -1212,7 +1254,6 @@ func (pc *partitionConsumer) dispatcher() {
pc.metrics.PrefetchedMessages.Dec()
pc.metrics.PrefetchedBytes.Sub(float64(len(messages[0].payLoad)))
} else {
// we are ready for more messages
queueCh = pc.queueCh
}

Expand All @@ -1230,7 +1271,13 @@ func (pc *partitionConsumer) dispatcher() {

// reset available permits
pc.availablePermits.reset()
initialPermits := uint32(pc.queueSize)

var initialPermits uint32
if pc.options.autoReceiverQueueSize {
initialPermits = uint32(pc.currentQueueSize.Load())
} else {
initialPermits = uint32(pc.maxQueueSize)
}

pc.log.Debugf("dispatcher requesting initial permits=%d", initialPermits)
// send initial permits
Expand All @@ -1254,6 +1301,11 @@ func (pc *partitionConsumer) dispatcher() {

pc.availablePermits.inc()

if pc.options.autoReceiverQueueSize {
pc.incomingMessages.Dec()
pc.expectMoreIncomingMessages()
}

case clearQueueCb := <-pc.clearQueueCh:
// drain the message queue on any new connection by sending a
// special nil message to the channel so we know when to stop dropping messages
Expand All @@ -1269,6 +1321,9 @@ func (pc *partitionConsumer) dispatcher() {
} else if nextMessageInQueue == nil {
nextMessageInQueue = toTrackingMessageID(m[0].msgID)
}
if pc.options.autoReceiverQueueSize {
pc.incomingMessages.Sub(int32(len(m)))
}
}

messages = nil
Expand Down Expand Up @@ -1641,6 +1696,32 @@ func getPreviousMessage(mid *trackingMessageID) *trackingMessageID {
}
}

func (pc *partitionConsumer) expectMoreIncomingMessages() {
if !pc.options.autoReceiverQueueSize {
return
}
if pc.scaleReceiverQueueHint.CAS(true, false) {
oldSize := pc.currentQueueSize.Load()
maxSize := int32(pc.options.receiverQueueSize)
newSize := int32(math.Min(float64(maxSize), float64(oldSize*2)))
if newSize > oldSize {
pc.currentQueueSize.CAS(oldSize, newSize)
pc.availablePermits.add(newSize - oldSize)
pc.log.Debugf("update currentQueueSize from %d -> %d", oldSize, newSize)
}
}
}

func (pc *partitionConsumer) markScaleIfNeed() {
// availablePermits + incomingMessages (messages in queueCh) is the number of prefetched messages
// The result of auto-scale we expected is currentQueueSize is slightly bigger than prefetched messages
prev := pc.scaleReceiverQueueHint.Swap(pc.availablePermits.get()+pc.incomingMessages.Load() >=
pc.currentQueueSize.Load())
if prev != pc.scaleReceiverQueueHint.Load() {
pc.log.Debugf("update scaleReceiverQueueHint from %t -> %t", prev, pc.scaleReceiverQueueHint.Load())
}
}

func (pc *partitionConsumer) Decompress(msgMeta *pb.MessageMetadata, payload internal.Buffer) (internal.Buffer, error) {
providerEntry, ok := pc.compressionProviders.Load(msgMeta.GetCompression())
if !ok {
Expand Down
3 changes: 3 additions & 0 deletions pulsar/consumer_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
metrics: newTestMetrics(),
decryptor: crypto.NewNoopDecryptor(),
}
pc.availablePermits = &availablePermits{pc: &pc}
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
func(id MessageID) { pc.sendIndividualAck(id) }, nil)

Expand Down Expand Up @@ -75,6 +76,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
metrics: newTestMetrics(),
decryptor: crypto.NewNoopDecryptor(),
}
pc.availablePermits = &availablePermits{pc: &pc}
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
func(id MessageID) { pc.sendIndividualAck(id) }, nil)

Expand Down Expand Up @@ -110,6 +112,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
metrics: newTestMetrics(),
decryptor: crypto.NewNoopDecryptor(),
}
pc.availablePermits = &availablePermits{pc: &pc}
pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0},
func(id MessageID) { pc.sendIndividualAck(id) }, nil)

Expand Down
Loading